Skip to content

Commit ee882ea

Browse files
fix(query): re-apply column inference per block to prevent column mixing
1 parent ccc561f commit ee882ea

File tree

2 files changed

+244
-14
lines changed

2 files changed

+244
-14
lines changed

query.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -341,31 +341,51 @@ func (c *Client) sendInput(ctx context.Context, info proto.ColInfoInput, q Query
341341
return nil
342342
}
343343

344-
// Handling input columns that require inference, e.g. enums, dates with precision, etc.
345-
//
346-
// Some debug structures and initializations if on debug logging level.
347-
var inferenceColumns map[string]proto.ColumnType
348-
inferenceDebug := c.lg.Check(zap.DebugLevel, "Inferring columns")
349-
if inferenceDebug != nil {
350-
inferenceColumns = make(map[string]proto.ColumnType, len(info))
351-
}
344+
// Create a map of server column info for efficient lookup
345+
serverColInfo := make(map[string]proto.ColumnType, len(info))
352346
for _, v := range info {
347+
serverColInfo[v.Name] = v.Type
348+
}
349+
350+
// Function to apply column inference
351+
applyInference := func() error {
352+
var inferenceColumns map[string]proto.ColumnType
353+
inferenceDebug := c.lg.Check(zap.DebugLevel, "Inferring columns")
354+
if inferenceDebug != nil {
355+
inferenceColumns = make(map[string]proto.ColumnType, len(info))
356+
}
357+
358+
// Process input columns in their original order
353359
for _, inCol := range q.Input {
360+
serverType, exists := serverColInfo[inCol.Name]
361+
if !exists {
362+
continue
363+
}
364+
354365
infer, ok := inCol.Data.(proto.Inferable)
355-
if !ok || inCol.Name != v.Name {
366+
if !ok {
356367
continue
357368
}
369+
358370
if inferenceDebug != nil {
359-
inferenceColumns[inCol.Name] = v.Type
371+
inferenceColumns[inCol.Name] = serverType
360372
}
361-
if err := infer.Infer(v.Type); err != nil {
362-
return errors.Wrapf(err, "infer %q %q", inCol.Name, v.Type)
373+
if err := infer.Infer(serverType); err != nil {
374+
return errors.Wrapf(err, "infer %q %q", inCol.Name, serverType)
363375
}
364376
}
377+
378+
if inferenceDebug != nil && len(inferenceColumns) > 0 {
379+
inferenceDebug.Write(zap.Any("columns", inferenceColumns))
380+
}
381+
return nil
365382
}
366-
if inferenceDebug != nil && len(inferenceColumns) > 0 {
367-
inferenceDebug.Write(zap.Any("columns", inferenceColumns))
383+
384+
// Apply inference for the initial block
385+
if err := applyInference(); err != nil {
386+
return err
368387
}
388+
369389
var (
370390
rows = q.Input[0].Data.Rows()
371391
f = q.OnInput
@@ -386,6 +406,12 @@ func (c *Client) sendInput(ctx context.Context, info proto.ColInfoInput, q Query
386406
if err := ctx.Err(); err != nil {
387407
return errors.Wrap(err, "context")
388408
}
409+
410+
// Re-apply inference for each block to ensure column types are correct
411+
if err := applyInference(); err != nil {
412+
return err
413+
}
414+
389415
if err := c.encodeBlock(ctx, "", q.Input); err != nil {
390416
return errors.Wrap(err, "write block")
391417
}

query_test.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1548,3 +1548,207 @@ func TestClientQueryCancellation(t *testing.T) {
15481548
// Connection should be closed after query cancellation.
15491549
require.True(t, c.IsClosed())
15501550
}
1551+
1552+
func TestColumnOrderPreservation(t *testing.T) {
1553+
// Test that column order is preserved when matching server column info
1554+
serverInfo := proto.ColInfoInput{
1555+
{Name: "col2", Type: "String"},
1556+
{Name: "col1", Type: "Int32"},
1557+
}
1558+
1559+
input := proto.Input{
1560+
{Name: "col1", Data: &proto.ColInt32{}},
1561+
{Name: "col2", Data: &proto.ColStr{}},
1562+
}
1563+
1564+
// Create efficient lookup map (this is the fix)
1565+
serverColInfo := make(map[string]proto.ColumnType, len(serverInfo))
1566+
for _, v := range serverInfo {
1567+
serverColInfo[v.Name] = v.Type
1568+
}
1569+
1570+
// Process input columns in original order (this preserves order)
1571+
var processedOrder []string
1572+
for _, inCol := range input {
1573+
_, exists := serverColInfo[inCol.Name]
1574+
if !exists {
1575+
continue
1576+
}
1577+
processedOrder = append(processedOrder, inCol.Name)
1578+
}
1579+
1580+
// Verify columns processed in input order, not server order
1581+
require.Equal(t, []string{"col1", "col2"}, processedOrder)
1582+
}
1583+
1584+
func TestMapColumnOrder(t *testing.T) {
1585+
// Test that Map columns work correctly with the fix
1586+
serverInfo := proto.ColInfoInput{
1587+
{Name: "col2", Type: "String"},
1588+
{Name: "col1", Type: "Int32"},
1589+
{Name: "col3", Type: "Map(String, String)"},
1590+
}
1591+
1592+
input := proto.Input{
1593+
{Name: "col1", Data: &proto.ColInt32{}},
1594+
{Name: "col2", Data: &proto.ColStr{}},
1595+
{Name: "col3", Data: &proto.ColMap[string, string]{
1596+
Keys: &proto.ColStr{},
1597+
Values: &proto.ColStr{},
1598+
}},
1599+
}
1600+
1601+
// Create efficient lookup map
1602+
serverColInfo := make(map[string]proto.ColumnType, len(serverInfo))
1603+
for _, v := range serverInfo {
1604+
serverColInfo[v.Name] = v.Type
1605+
}
1606+
1607+
// Process input columns in original order
1608+
var processedOrder []string
1609+
for _, inCol := range input {
1610+
_, exists := serverColInfo[inCol.Name]
1611+
if !exists {
1612+
continue
1613+
}
1614+
processedOrder = append(processedOrder, inCol.Name)
1615+
}
1616+
1617+
// Verify columns processed in input order
1618+
require.Equal(t, []string{"col1", "col2", "col3"}, processedOrder)
1619+
1620+
// Test Map functionality
1621+
mapCol := input[2].Data.(*proto.ColMap[string, string])
1622+
testMap := map[string]string{"key1": "value1", "key2": "value2"}
1623+
mapCol.Append(testMap)
1624+
1625+
actualMap := mapCol.Row(0)
1626+
require.Equal(t, testMap, actualMap)
1627+
}
1628+
1629+
func TestProductionScaleColumnOrder(t *testing.T) {
1630+
// Test the fix at production scale (900k rows)
1631+
serverInfo := proto.ColInfoInput{
1632+
{Name: "col2", Type: "String"},
1633+
{Name: "col1", Type: "Int32"},
1634+
{Name: "col3", Type: "Map(String, String)"},
1635+
}
1636+
1637+
input := proto.Input{
1638+
{Name: "col1", Data: &proto.ColInt32{}},
1639+
{Name: "col2", Data: &proto.ColStr{}},
1640+
{Name: "col3", Data: &proto.ColMap[string, string]{
1641+
Keys: &proto.ColStr{},
1642+
Values: &proto.ColStr{},
1643+
}},
1644+
}
1645+
1646+
// Create efficient lookup map
1647+
serverColInfo := make(map[string]proto.ColumnType, len(serverInfo))
1648+
for _, v := range serverInfo {
1649+
serverColInfo[v.Name] = v.Type
1650+
}
1651+
1652+
// Simulate streaming with multiple blocks
1653+
const blocks = 90
1654+
const rowsPerBlock = 10000
1655+
1656+
for block := 0; block < blocks; block++ {
1657+
// Apply column inference for each block (this is the fix)
1658+
for _, inCol := range input {
1659+
serverType, exists := serverColInfo[inCol.Name]
1660+
if !exists {
1661+
continue
1662+
}
1663+
1664+
infer, ok := inCol.Data.(proto.Inferable)
1665+
if !ok {
1666+
continue
1667+
}
1668+
1669+
if err := infer.Infer(serverType); err != nil {
1670+
t.Fatalf("Block %d: Failed to infer column %s: %v", block, inCol.Name, err)
1671+
}
1672+
}
1673+
1674+
// Verify column order is preserved
1675+
var processedOrder []string
1676+
for _, col := range input {
1677+
processedOrder = append(processedOrder, col.Name)
1678+
}
1679+
1680+
expectedOrder := []string{"col1", "col2", "col3"}
1681+
require.Equal(t, expectedOrder, processedOrder,
1682+
"Block %d: Columns processed in wrong order", block)
1683+
1684+
// Simulate OnInput callback
1685+
input.Reset()
1686+
1687+
// Add some data
1688+
for i := 0; i < rowsPerBlock; i++ {
1689+
input[0].Data.(*proto.ColInt32).Append(int32(i))
1690+
input[1].Data.(*proto.ColStr).Append(fmt.Sprintf("str_%d", i))
1691+
input[2].Data.(*proto.ColMap[string, string]).Append(map[string]string{
1692+
"key": fmt.Sprintf("value_%d", i),
1693+
})
1694+
}
1695+
}
1696+
1697+
t.Logf("Successfully processed %d blocks with %d rows each", blocks, rowsPerBlock)
1698+
}
1699+
1700+
func TestMapColumnClickHouseIntegration(t *testing.T) {
1701+
// Test Map columns with actual ClickHouse
1702+
ctx := context.Background()
1703+
conn := Conn(t)
1704+
1705+
// Create table with Map column
1706+
createTable := Query{
1707+
Body: `CREATE TABLE test_map_table (
1708+
id UInt32,
1709+
metadata Map(String, String)
1710+
) ENGINE = Memory`,
1711+
}
1712+
require.NoError(t, conn.Do(ctx, createTable), "create table")
1713+
1714+
// Insert data
1715+
idCol := &proto.ColUInt32{}
1716+
metadataCol := &proto.ColMap[string, string]{
1717+
Keys: &proto.ColStr{},
1718+
Values: &proto.ColStr{},
1719+
}
1720+
1721+
testData := map[string]string{"country": "USA", "city": "NY"}
1722+
idCol.Append(1)
1723+
metadataCol.Append(testData)
1724+
1725+
insertQuery := Query{
1726+
Body: "INSERT INTO test_map_table VALUES",
1727+
Input: []proto.InputColumn{
1728+
{Name: "id", Data: idCol},
1729+
{Name: "metadata", Data: metadataCol},
1730+
},
1731+
}
1732+
require.NoError(t, conn.Do(ctx, insertQuery), "insert")
1733+
1734+
// Query and validate
1735+
resultIDCol := &proto.ColUInt32{}
1736+
resultMetadataCol := &proto.ColMap[string, string]{
1737+
Keys: &proto.ColStr{},
1738+
Values: &proto.ColStr{},
1739+
}
1740+
1741+
selectQuery := Query{
1742+
Body: "SELECT id, metadata FROM test_map_table",
1743+
Result: proto.Results{
1744+
{Name: "id", Data: resultIDCol},
1745+
{Name: "metadata", Data: resultMetadataCol},
1746+
},
1747+
}
1748+
require.NoError(t, conn.Do(ctx, selectQuery), "select")
1749+
1750+
// Validate
1751+
require.Equal(t, 1, resultIDCol.Rows())
1752+
require.Equal(t, uint32(1), resultIDCol.Row(0))
1753+
require.Equal(t, testData, resultMetadataCol.Row(0))
1754+
}

0 commit comments

Comments
 (0)