Skip to content

Commit c6e7bd7

Browse files
committed
query hints in dsn
1 parent ef4aaae commit c6e7bd7

File tree

5 files changed

+61
-52
lines changed

5 files changed

+61
-52
lines changed

connection.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ const (
2222
type odpsConn struct {
2323
*http.Client
2424
*Config
25-
hints map[string]string
2625
}
2726

2827
// ODPS does not support transaction
@@ -130,7 +129,7 @@ func (conn *odpsConn) wait(query string, args []driver.Value) (string, error) {
130129
query = fmt.Sprintf(query, args)
131130
}
132131

133-
ins, err := conn.createInstance(newSQLJob(query, conn.hints))
132+
ins, err := conn.createInstance(newSQLJob(query, conn.QueryHints))
134133
if err != nil {
135134
return "", err
136135
}

driver.go

+5-15
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,16 @@ import (
88

99
// register driver
1010
func init() {
11-
sql.Register("maxcompute", &MaxcomputeDriver{})
11+
sql.Register("maxcompute", &Driver{})
1212
}
1313

14-
// MaxcomputeDriver impls database/sql/driver.Driver
15-
type MaxcomputeDriver struct {
16-
conn odpsConn
17-
}
14+
// impls database/sql/driver.Driver
15+
type Driver struct{}
1816

19-
func (o MaxcomputeDriver) Open(dsn string) (driver.Conn, error) {
17+
func (d Driver) Open(dsn string) (driver.Conn, error) {
2018
cfg, err := ParseDSN(dsn)
2119
if err != nil {
2220
return nil, err
2321
}
24-
o.conn = odpsConn{&http.Client{}, cfg, nil}
25-
return &o.conn, nil
26-
}
27-
28-
// SetQuerySettings sets the global query settings.
29-
// TODO(Yancey1989): add the one-off query settings interface.
30-
func (o MaxcomputeDriver) SetQuerySettings(hints map[string]string) error {
31-
o.conn.hints = hints
32-
return nil
22+
return &odpsConn{&http.Client{}, cfg}, nil
3323
}

driver_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99
)
1010

1111
var cfg4test = &Config{
12-
AccessID: os.Getenv("ODPS_ACCESS_ID"),
13-
AccessKey: os.Getenv("ODPS_ACCESS_KEY"),
14-
Project: os.Getenv("ODPS_PROJECT"),
15-
Endpoint: os.Getenv("ODPS_ENDPOINT"),
12+
AccessID: os.Getenv("ODPS_ACCESS_ID"),
13+
AccessKey: os.Getenv("ODPS_ACCESS_KEY"),
14+
Project: os.Getenv("ODPS_PROJECT"),
15+
Endpoint: os.Getenv("ODPS_ENDPOINT"),
16+
QueryHints: map[string]string{"hints_odps.sql.mapper.split_size": "16"},
1617
}
1718

1819
func TestSQLOpen(t *testing.T) {
@@ -26,7 +27,6 @@ func TestQuerySettings(t *testing.T) {
2627
a := assert.New(t)
2728
db, err := sql.Open("maxcompute", cfg4test.FormatDSN())
2829
a.NoError(err)
29-
db.Driver().(*MaxcomputeDriver).SetQuerySettings(map[string]string{"odps.sql.mapper.split.size": "16"})
3030
_, err = db.Query("SELECT * FROM gomaxcompute_test LIMIT;")
3131
a.NoError(err)
3232
}

dsn.go

+41-24
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gomaxcompute
22

33
import (
44
"fmt"
5+
"net/url"
56
"regexp"
67
"strings"
78
)
@@ -13,10 +14,11 @@ var (
1314
)
1415

1516
type Config struct {
16-
AccessID string
17-
AccessKey string
18-
Project string
19-
Endpoint string
17+
AccessID string
18+
AccessKey string
19+
Project string
20+
Endpoint string
21+
QueryHints map[string]string
2022
}
2123

2224
func ParseDSN(dsn string) (*Config, error) {
@@ -26,32 +28,41 @@ func ParseDSN(dsn string) (*Config, error) {
2628
}
2729
id, key, endpointURL := sub[1], sub[2], sub[3]
2830

29-
query := make(map[string]string)
30-
for _, s := range strings.Split(sub[4], "&") {
31-
pair := reQuery.FindStringSubmatch(s)
32-
if len(pair) != 3 {
33-
return nil, fmt.Errorf("dsn %s doesn't match access_id:access_key@url?curr_project=project&scheme=http|https", dsn)
34-
}
35-
if pair[1] != "scheme" && pair[1] != currentProject {
36-
return nil, fmt.Errorf("dsn %s 's query is neither scheme or %s", dsn, currentProject)
37-
}
38-
query[pair[1]] = pair[2]
31+
var schemeArgs []string
32+
var currProjArgs []string
33+
var ok bool
34+
queryHints := make(map[string]string)
35+
36+
querys, err := url.ParseQuery(sub[4])
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
if schemeArgs, ok = querys["scheme"]; !ok || len(schemeArgs) != 1 {
42+
return nil, fmt.Errorf("dsn %s should have one scheme argument", dsn)
3943
}
40-
if _, ok := query[currentProject]; !ok {
41-
return nil, fmt.Errorf("dsn %s doesn't have curr_project", dsn)
44+
if currProjArgs, ok = querys[currentProject]; !ok || len(currProjArgs) != 1 {
45+
return nil, fmt.Errorf("dsn %s should have one current_project argument", dsn)
4246
}
43-
if _, ok := query["scheme"]; !ok {
44-
return nil, fmt.Errorf("dsn %s doesn't have scheme", dsn)
47+
48+
for k, v := range querys {
49+
// The query args such as hints_odps.sql.mapper.split_size=16
50+
// would be converted to the maxcompute query hints: {"odps.sql.mapper.split_size": "16"}
51+
if strings.HasPrefix(k, "hints_") {
52+
queryHints[k[6:]] = v[0]
53+
}
4554
}
46-
if query["scheme"] != "http" && query["scheme"] != "https" {
55+
56+
if schemeArgs[0] != "http" && schemeArgs[0] != "https" {
4757
return nil, fmt.Errorf("dsn %s 's scheme is neither http nor https", dsn)
4858
}
4959

5060
config := &Config{
51-
AccessID: id,
52-
AccessKey: key,
53-
Project: query[currentProject],
54-
Endpoint: query["scheme"] + "://" + endpointURL}
61+
AccessID: id,
62+
AccessKey: key,
63+
Project: currProjArgs[0],
64+
Endpoint: schemeArgs[0] + "://" + endpointURL,
65+
QueryHints: queryHints}
5566

5667
return config, nil
5768
}
@@ -62,6 +73,12 @@ func (cfg *Config) FormatDSN() string {
6273
return ""
6374
}
6475
scheme, endpointURL := pair[0], pair[1]
65-
return fmt.Sprintf("%s:%s@%s?curr_project=%s&scheme=%s",
76+
dsnFormt := fmt.Sprintf("%s:%s@%s?curr_project=%s&scheme=%s",
6677
cfg.AccessID, cfg.AccessKey, endpointURL, cfg.Project, scheme)
78+
if len(cfg.QueryHints) != 0 {
79+
for k, v := range cfg.QueryHints {
80+
dsnFormt = fmt.Sprintf("%s&hints_%s=%v", dsnFormt, k, v)
81+
}
82+
}
83+
return dsnFormt
6784
}

dsn_test.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import (
99
func TestConfig_ParseDSN(t *testing.T) {
1010
a := assert.New(t)
1111

12-
correct := "access_id:[email protected]/api?curr_project=test_ci&scheme=http"
12+
correct := "access_id:[email protected]/api?curr_project=test_ci&scheme=http&hints_odps.sql.mapper.split_size=16"
1313
config, err := ParseDSN(correct)
1414
a.NoError(err)
1515
a.Equal("access_id", config.AccessID)
1616
a.Equal("access_key", config.AccessKey)
1717
a.Equal("test_ci", config.Project)
1818
a.Equal("http://service.com/api", config.Endpoint)
19+
a.Equal("16", config.QueryHints["odps.sql.mapper.split_size"])
1920

2021
badDSN := []string{
2122
"", // empty
@@ -46,11 +47,13 @@ func TestConfig_ParseDSN(t *testing.T) {
4647
func TestConfig_FormatDSN(t *testing.T) {
4748
a := assert.New(t)
4849
config := Config{
49-
AccessID: "access_id",
50-
AccessKey: "access_key",
51-
Project: "test_ci",
52-
Endpoint: "http://service.com/api"}
53-
a.Equal("access_id:[email protected]/api?curr_project=test_ci&scheme=http", config.FormatDSN())
50+
AccessID: "access_id",
51+
AccessKey: "access_key",
52+
Project: "test_ci",
53+
Endpoint: "http://service.com/api",
54+
QueryHints: map[string]string{"odps.sql.mapper.split_size": "16"}}
55+
a.Equal("access_id:[email protected]/api?curr_project="+
56+
"test_ci&scheme=http&hints_odps.sql.mapper.split_size=16", config.FormatDSN())
5457
}
5558

5659
func TestConfig_ParseAndFormatRoundTrip(t *testing.T) {

0 commit comments

Comments
 (0)