@@ -34,7 +34,6 @@ import (
3434 "regexp"
3535 "strconv"
3636 "strings"
37- "sync"
3837
3938 "github.com/arduino/go-properties-orderedmap"
4039)
@@ -86,14 +85,13 @@ type ErrorCallback func(err string)
8685// it must be created using the NewDiscoveryServer function.
8786type DiscoveryServer struct {
8887 impl Discovery
89- out io.Writer
90- outMutex sync.Mutex
88+ outputChan chan * message
9189 userAgent string
9290 reqProtocolVersion int
9391 initialized bool
9492 started bool
9593 syncStarted bool
96- syncChannel chan interface {}
94+ syncChannel chan * message
9795 cachedPorts map [string ]* Port
9896 cachedErr string
9997}
@@ -103,7 +101,8 @@ type DiscoveryServer struct {
103101// use the Run method.
104102func NewDiscoveryServer (impl Discovery ) * DiscoveryServer {
105103 return & DiscoveryServer {
106- impl : impl ,
104+ impl : impl ,
105+ outputChan : make (chan * message ),
107106 }
108107}
109108
@@ -113,20 +112,21 @@ func NewDiscoveryServer(impl Discovery) *DiscoveryServer {
113112// the input stream is closed. In case of IO error the error is
114113// returned.
115114func (d * DiscoveryServer ) Run (in io.Reader , out io.Writer ) error {
116- d .out = out
115+ go d .outputProcessor (out )
116+ defer close (d .outputChan )
117117 reader := bufio .NewReader (in )
118118 for {
119119 fullCmd , err := reader .ReadString ('\n' )
120120 if err != nil {
121- d .outputError ("command_error" , err .Error ())
121+ d .outputChan <- messageError ("command_error" , err .Error ())
122122 return err
123123 }
124124 fullCmd = strings .TrimSpace (fullCmd )
125125 split := strings .Split (fullCmd , " " )
126126 cmd := strings .ToUpper (split [0 ])
127127
128128 if ! d .initialized && cmd != "HELLO" && cmd != "QUIT" {
129- d .outputError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ))
129+ d .outputChan <- messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ))
130130 continue
131131 }
132132
@@ -143,61 +143,61 @@ func (d *DiscoveryServer) Run(in io.Reader, out io.Writer) error {
143143 d .stop ()
144144 case "QUIT" :
145145 d .impl .Quit ()
146- d .outputOk ("quit" )
146+ d .outputChan <- messageOk ("quit" )
147147 return nil
148148 default :
149- d .outputError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ))
149+ d .outputChan <- messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ))
150150 }
151151 }
152152}
153153
154154func (d * DiscoveryServer ) hello (cmd string ) {
155155 if d .initialized {
156- d .outputError ("hello" , "HELLO already called" )
156+ d .outputChan <- messageError ("hello" , "HELLO already called" )
157157 return
158158 }
159159 re := regexp .MustCompile (`^(\d+) "([^"]+)"$` )
160160 matches := re .FindStringSubmatch (cmd )
161161 if len (matches ) != 3 {
162- d .outputError ("hello" , "Invalid HELLO command" )
162+ d .outputChan <- messageError ("hello" , "Invalid HELLO command" )
163163 return
164164 }
165165 d .userAgent = matches [2 ]
166166 if v , err := strconv .ParseInt (matches [1 ], 10 , 64 ); err != nil {
167- d .outputError ("hello" , "Invalid protocol version: " + matches [2 ])
167+ d .outputChan <- messageError ("hello" , "Invalid protocol version: " + matches [2 ])
168168 return
169169 } else {
170170 d .reqProtocolVersion = int (v )
171171 }
172172 if err := d .impl .Hello (d .userAgent , 1 ); err != nil {
173- d .outputError ("hello" , err .Error ())
173+ d .outputChan <- messageError ("hello" , err .Error ())
174174 return
175175 }
176- d .output ( & genericMessageJSON {
176+ d .outputChan <- & message {
177177 EventType : "hello" ,
178178 ProtocolVersion : 1 , // Protocol version 1 is the only supported for now...
179179 Message : "OK" ,
180- })
180+ }
181181 d .initialized = true
182182}
183183
184184func (d * DiscoveryServer ) start () {
185185 if d .started {
186- d .outputError ("start" , "Discovery already STARTed" )
186+ d .outputChan <- messageError ("start" , "Discovery already STARTed" )
187187 return
188188 }
189189 if d .syncStarted {
190- d .outputError ("start" , "Discovery already START_SYNCed, cannot START" )
190+ d .outputChan <- messageError ("start" , "Discovery already START_SYNCed, cannot START" )
191191 return
192192 }
193193 d .cachedPorts = map [string ]* Port {}
194194 d .cachedErr = ""
195195 if err := d .impl .StartSync (d .eventCallback , d .errorCallback ); err != nil {
196- d .outputError ("start" , "Cannot START: " + err .Error ())
196+ d .outputChan <- messageError ("start" , "Cannot START: " + err .Error ())
197197 return
198198 }
199199 d .started = true
200- d .outputOk ("start" )
200+ d .outputChan <- messageOk ("start" )
201201}
202202
203203func (d * DiscoveryServer ) eventCallback (event string , port * Port ) {
@@ -216,65 +216,61 @@ func (d *DiscoveryServer) errorCallback(msg string) {
216216
217217func (d * DiscoveryServer ) list () {
218218 if ! d .started {
219- d .outputError ("list" , "Discovery not STARTed" )
219+ d .outputChan <- messageError ("list" , "Discovery not STARTed" )
220220 return
221221 }
222222 if d .syncStarted {
223- d .outputError ("list" , "discovery already START_SYNCed, LIST not allowed" )
223+ d .outputChan <- messageError ("list" , "discovery already START_SYNCed, LIST not allowed" )
224224 return
225225 }
226226 if d .cachedErr != "" {
227- d .outputError ("list" , d .cachedErr )
227+ d .outputChan <- messageError ("list" , d .cachedErr )
228228 return
229229 }
230230 ports := []* Port {}
231231 for _ , port := range d .cachedPorts {
232232 ports = append (ports , port )
233233 }
234- type listOutputJSON struct {
235- EventType string `json:"eventType"`
236- Ports []* Port `json:"ports"`
237- }
238- d .output (& listOutputJSON {
234+ d .outputChan <- & message {
239235 EventType : "list" ,
240- Ports : ports ,
241- })
236+ Ports : & ports ,
237+ }
242238}
243239
244240func (d * DiscoveryServer ) startSync () {
245241 if d .syncStarted {
246- d .outputError ("start_sync" , "Discovery already START_SYNCed" )
242+ d .outputChan <- messageError ("start_sync" , "Discovery already START_SYNCed" )
247243 return
248244 }
249245 if d .started {
250- d .outputError ("start_sync" , "Discovery already STARTed, cannot START_SYNC" )
246+ d .outputChan <- messageError ("start_sync" , "Discovery already STARTed, cannot START_SYNC" )
251247 return
252248 }
253- c := make (chan interface {} , 10 ) // buffer up to 10 events
249+ c := make (chan * message , 10 ) // buffer up to 10 events
254250 d .syncChannel = c
255251 if err := d .impl .StartSync (d .syncEvent , d .errorEvent ); err != nil {
256- d .outputError ("start_sync" , "Cannot START_SYNC: " + err .Error ())
252+ d .outputChan <- messageError ("start_sync" , "Cannot START_SYNC: " + err .Error ())
257253 close (d .syncChannel ) // do not leak channel...
258254 d .syncChannel = nil
259255 return
260256 }
261257 d .syncStarted = true
262- d .outputOk ("start_sync" )
258+ d .outputChan <- messageOk ("start_sync" )
263259
264260 go func () {
265261 for e := range c {
266- d .output ( e )
262+ d .outputChan <- e
267263 }
268264 }()
269265}
270266
271267func (d * DiscoveryServer ) stop () {
272268 if ! d .syncStarted && ! d .started {
273- d .outputError ("stop" , "Discovery already STOPped" )
269+ d .outputChan <- messageError ("stop" , "Discovery already STOPped" )
274270 return
275271 }
276272 if err := d .impl .Stop (); err != nil {
277- d .outputError ("stop" , "Cannot STOP: " + err .Error ())
273+ d .outputChan <- messageError ("stop" , "Cannot STOP: " + err .Error ())
278274 return
279275 }
280276 d .started = false
@@ -283,66 +279,31 @@ func (d *DiscoveryServer) stop() {
283279 d .syncChannel = nil
284280 d .syncStarted = false
285281 }
286- d .outputOk ("stop" )
282+ d .outputChan <- messageOk ("stop" )
287283}
288284
289285func (d * DiscoveryServer ) syncEvent (event string , port * Port ) {
290- type syncOutputJSON struct {
291- EventType string `json:"eventType"`
292- Port * Port `json:"port"`
293- }
294- d .syncChannel <- & syncOutputJSON {
286+ d .syncChannel <- & message {
295287 EventType : event ,
296288 Port : port ,
297289 }
298290}
299291
300292func (d * DiscoveryServer ) errorEvent (msg string ) {
301- type syncOutputJSON struct {
302- EventType string `json:"eventType"`
303- Error bool `json:"error"`
304- Message string `json:"message"`
305- }
306- d .syncChannel <- & syncOutputJSON {
307- EventType : "start_sync" ,
308- Error : true ,
309- Message : msg ,
310- }
311- }
312-
313- type genericMessageJSON struct {
314- EventType string `json:"eventType"`
315- Message string `json:"message"`
316- Error bool `json:"error,omitempty"`
317- ProtocolVersion int `json:"protocolVersion,omitempty"`
293+ d .syncChannel <- messageError ("start_sync" , msg )
318294}
319295
320- func (d * DiscoveryServer ) outputOk (event string ) {
321- d .output (& genericMessageJSON {
322- EventType : event ,
323- Message : "OK" ,
324- })
325- }
326-
327- func (d * DiscoveryServer ) outputError (event , msg string ) {
328- d .output (& genericMessageJSON {
329- EventType : event ,
330- Error : true ,
331- Message : msg ,
332- })
333- }
334-
335- func (d * DiscoveryServer ) output (msg interface {}) {
336- data , err := json .MarshalIndent (msg , "" , " " )
337- if err != nil {
338- d .output (& genericMessageJSON {
339- EventType : "command_error" ,
340- Error : true ,
341- Message : err .Error (),
342- })
343- } else {
344- d .outMutex .Lock ()
345- fmt .Fprintln (d .out , string (data ))
346- d .outMutex .Unlock ()
347- }
296+ func (d * DiscoveryServer ) outputProcessor (outWriter io.Writer ) {
297+ // Start go routine to serialize messages printing
298+ go func () {
299+ for msg := range d .outputChan {
300+ data , err := json .MarshalIndent (msg , "" , " " )
301+ if err != nil {
302+ // We are certain that this will be marshalled correctly
303+ // so we don't handle the error
304+ data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
305+ }
306+ fmt .Fprintln (outWriter , string (data ))
307+ }
308+ }()
348309}
0 commit comments