-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy paththriftserver.go
104 lines (91 loc) · 3.33 KB
/
thriftserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* Copyright © 2017 Xiao Zhang <[email protected]>.
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file.
*/
package turbo
import (
"net/http"
"time"
"github.com/apache/thrift/lib/go/thrift"
)
type ThriftServer struct {
*Server
tClient *thriftClient
thriftServer *thrift.TSimpleServer
}
func NewThriftServer(initializer Initializable, configFilePath string) *ThriftServer {
if initializer == nil {
initializer = &defaultInitializer{}
}
s := &ThriftServer{
Server: &Server{
Config: NewConfig("thrift", configFilePath),
Components: new(Components),
reloadConfig: make(chan bool),
Initializer: initializer,
},
tClient: new(thriftClient),
}
s.initChans()
initLogger(s.Config)
return s
}
type thriftClientCreator func(trans thrift.TTransport, f thrift.TProtocolFactory) map[string]interface{}
type processorRegister func() map[string]thrift.TProcessor
// Start starts both HTTP server and Thrift service
func (s *ThriftServer) Start(clientCreator thriftClientCreator, sw switcher, registerTProcessor processorRegister) {
log.Info("Starting Turbo...")
s.Initializer.InitService(s)
s.thriftServer = s.startThriftServiceInternal(registerTProcessor, false)
time.Sleep(time.Second * 1)
s.httpServer = s.startThriftHTTPServerInternal(clientCreator, sw)
watchConfigReload(s)
}
// StartHTTPServer starts a HTTP server which sends requests via Thrift
func (s *ThriftServer) StartHTTPServer(clientCreator thriftClientCreator, sw switcher) {
s.Initializer.InitService(s)
s.httpServer = s.startThriftHTTPServerInternal(clientCreator, sw)
watchConfigReload(s)
}
// StartThriftService starts a Thrift service
func (s *ThriftServer) StartThriftService(registerTProcessor processorRegister) {
s.Initializer.InitService(s)
s.thriftServer = s.startThriftServiceInternal(registerTProcessor, true)
}
func (s *ThriftServer) startThriftHTTPServerInternal(clientCreator thriftClientCreator, sw switcher) *http.Server {
log.Info("Starting HTTP Server...")
switcherFunc = sw
s.tClient.init(s.Config.ThriftServiceHost()+":"+s.Config.ThriftServicePort(), clientCreator)
return startHTTPServer(s)
}
func (s *ThriftServer) startThriftServiceInternal(registerTProcessor processorRegister, alone bool) *thrift.TSimpleServer {
port := s.Config.ThriftServicePort()
log.Infof("Starting Thrift Service at :%s...", port)
transport, err := thrift.NewTServerSocket(":" + port)
logPanicIf(err)
processor := thrift.NewTMultiplexedProcessor()
for name, p := range registerTProcessor() {
processor.RegisterProcessor(name, p)
}
server := thrift.NewTSimpleServer4(processor, transport,
thrift.NewTTransportFactory(), thrift.NewTBinaryProtocolFactoryDefault()) // todo support start server using other modes
go func() {
err := server.Serve()
panicIf(err)
}()
log.Info("Thrift Service started")
return server
}
// ThriftService returns a Thrift client instance,
// example: client := turbo.ThriftService().(proto.YourServiceClient)
func (s *ThriftServer) Service(serviceName string) interface{} {
if s == nil || s.tClient == nil || s.tClient.thriftService == nil {
log.Panic("thrift connection not initiated!")
}
return s.tClient.thriftService[serviceName]
}
func (s *ThriftServer) ServerField() *Server { return s.Server }
func (s *ThriftServer) Stop() {
stop(s, s.httpServer, nil, s.thriftServer)
}