forked from src-d/gitbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsession.go
264 lines (218 loc) · 6.67 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package gitbase
import (
"context"
"strings"
"sync"
"time"
bblfsh "github.com/bblfsh/go-client/v4"
"github.com/sirupsen/logrus"
"github.com/src-d/go-mysql-server/server"
"github.com/src-d/go-mysql-server/sql"
"google.golang.org/grpc/connectivity"
errors "gopkg.in/src-d/go-errors.v1"
"vitess.io/vitess/go/mysql"
)
// Session is the custom implementation of a gitbase session.
type Session struct {
sql.Session
Pool *RepositoryPool
bblfshMu sync.Mutex
bblfshEndpoint string
bblfshClient *BblfshClient
SkipGitErrors bool
}
// getSession returns the gitbase session from a context or an error if there
// is no session or is not of the matching type inside the context.
func getSession(ctx *sql.Context) (*Session, error) {
if ctx == nil || ctx.Session == nil {
return nil, ErrInvalidContext.New(ctx)
}
session, ok := ctx.Session.(*Session)
if !ok {
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
}
return session, nil
}
const (
bblfshEndpointKey = "BBLFSH_ENDPOINT"
defaultBblfshEndpoint = "127.0.0.1:9432"
)
// SessionOption is a function that configures the session given some options.
type SessionOption func(*Session)
// WithBblfshEndpoint configures the bblfsh endpoint of the session.
func WithBblfshEndpoint(endpoint string) SessionOption {
return func(s *Session) {
s.bblfshEndpoint = endpoint
}
}
// WithSkipGitErrors changes the behavior with go-git error.
func WithSkipGitErrors(enabled bool) SessionOption {
return func(s *Session) {
s.SkipGitErrors = enabled
}
}
// WithBaseSession sets the given session as the base session.
func WithBaseSession(sess sql.Session) SessionOption {
return func(s *Session) {
s.Session = sess
}
}
// NewSession creates a new Session. It requires a repository pool and any
// number of session options can be passed to configure the session.
func NewSession(pool *RepositoryPool, opts ...SessionOption) *Session {
sess := &Session{
Session: sql.NewBaseSession(),
Pool: pool,
bblfshEndpoint: getStringEnv(bblfshEndpointKey, defaultBblfshEndpoint),
}
for _, opt := range opts {
opt(sess)
}
return sess
}
const bblfshMaxAttempts = 10
// BblfshClient is a wrapper around a bblfsh client to extend its
// functionality.
type BblfshClient struct {
*bblfsh.Client
supportedLanguages []string
}
// IsLanguageSupported returns whether the language is supported in the bblfsh
// server this client is connected to.
func (c *BblfshClient) IsLanguageSupported(ctx context.Context, lang string) (bool, error) {
langs, err := c.SupportedLanguages(ctx)
if err != nil {
return false, err
}
for _, lng := range langs {
if strings.ToLower(lng) == strings.ToLower(lang) {
return true, nil
}
}
return false, nil
}
// SupportedLanguages returns the list of supported languages for the bblfsh
// server this client is connected to.
func (c *BblfshClient) SupportedLanguages(ctx context.Context) ([]string, error) {
if len(c.supportedLanguages) == 0 {
driverManifests, err := c.Client.
NewSupportedLanguagesRequest().
Context(ctx).DoV2()
if err != nil {
return nil, err
}
for _, dm := range driverManifests {
c.supportedLanguages = append(c.supportedLanguages, dm.Language)
c.supportedLanguages = append(c.supportedLanguages, dm.Aliases...)
}
}
return c.supportedLanguages, nil
}
// Parse the given content with the given language.
func (c *BblfshClient) Parse(
ctx context.Context,
lang string,
content []byte,
) (bblfsh.Node, string, error) {
return c.NewParseRequest().
Language(lang).
Content(string(content)).
Context(ctx).
UAST()
}
// ParseWithMode the given content with the given language.
func (c *BblfshClient) ParseWithMode(
ctx context.Context,
mode bblfsh.Mode,
lang string,
content []byte,
) (bblfsh.Node, string, error) {
return c.NewParseRequest().
Mode(mode).
Language(lang).
Content(string(content)).
Context(ctx).
UAST()
}
// BblfshClient returns a BblfshClient.
func (s *Session) BblfshClient() (*BblfshClient, error) {
s.bblfshMu.Lock()
defer s.bblfshMu.Unlock()
if s.bblfshClient == nil {
client, err := connectToBblfsh(s.bblfshEndpoint)
if err != nil {
return nil, err
}
s.bblfshClient = &BblfshClient{Client: client}
}
var attempts, totalAttempts int
for {
if attempts > bblfshMaxAttempts || totalAttempts > 3*bblfshMaxAttempts {
return nil, ErrBblfshConnection.New("max attempts exceeded")
}
switch s.bblfshClient.GetState() {
case connectivity.Ready, connectivity.Idle:
return s.bblfshClient, nil
case connectivity.Connecting:
attempts = 0
logrus.WithField("attempts", totalAttempts).
Debug("bblfsh is connecting, sleeping 100ms")
time.Sleep(100 * time.Millisecond)
default:
if err := s.bblfshClient.Close(); err != nil {
return nil, ErrBblfshConnection.New(err)
}
logrus.Debug("bblfsh connection is closed, opening a new one")
client, err := connectToBblfsh(s.bblfshEndpoint)
if err != nil {
return nil, err
}
s.bblfshClient = &BblfshClient{Client: client}
}
attempts++
totalAttempts++
}
}
// Close implements the io.Closer interface.
func (s *Session) Close() error {
s.bblfshMu.Lock()
defer s.bblfshMu.Unlock()
if s.bblfshClient != nil {
return s.bblfshClient.Close()
}
return nil
}
func connectToBblfsh(endpoint string) (*bblfsh.Client, error) {
client, err := bblfsh.NewClient(endpoint)
if err != nil {
if err == context.DeadlineExceeded {
return nil, ErrBblfshConnection.New(err)
}
return nil, ErrBblfshConnection.New(err)
}
return client, nil
}
// NewSessionBuilder creates a SessionBuilder with the given Repository Pool.
func NewSessionBuilder(pool *RepositoryPool, opts ...SessionOption) server.SessionBuilder {
return func(c *mysql.Conn, host string) sql.Session {
opts = append(opts, WithBaseSession(sql.NewSession(host, c.RemoteAddr().String(), c.User, c.ConnectionID)))
return NewSession(pool, opts...)
}
}
// ErrSessionCanceled is returned when session context is canceled
var ErrSessionCanceled = errors.NewKind("session canceled")
// ErrInvalidGitbaseSession is returned when some node expected a gitbase
// session but received something else.
var ErrInvalidGitbaseSession = errors.NewKind("expecting gitbase session, but received: %T")
// ErrInvalidContext is returned when some node expected an sql.Context
// with gitbase session but received something else.
var ErrInvalidContext = errors.NewKind("invalid context received: %v")
// ErrBblfshConnection is returned when it's impossible to connect to bblfsh.
var ErrBblfshConnection = errors.NewKind("unable to establish a connection with the bblfsh server: %s")
func shouldSkipErrors(ctx *sql.Context) bool {
s, err := getSession(ctx)
if err != nil {
return false
}
return s.SkipGitErrors
}