diff --git a/pkg/api/api.go b/pkg/api/api.go index c8d8148..9e0121a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -20,47 +20,37 @@ var ( dbClient *bolt.Client ) -// Options holds the basic configuration of the http server -// TODO implement reading options from a config file -type Options struct { - Port string `json:"port"` - RWTimeout int64 `json:"timeout"` - Verbose bool `json:"verbose"` - Debug bool `json:"debug"` - Database *bolt.Options `json:"database"` -} - // Configure takes a http.Server and configures it with the specified Options func Configure(server *http.Server, opts *Options) error { - // setup router - apilog.Println("starting server on :" + opts.Port) - server.Addr = ":" + opts.Port - server.ReadTimeout = time.Duration(opts.RWTimeout) - server.WriteTimeout = time.Duration(opts.RWTimeout) - // TODO can we move this to pkg/session? dbClient = bolt.NewClient(opts.Database) if err := dbClient.Open(); err != nil { apilog.Println("Error opening database:", err) return err } - canvas.Init() + // initialize canvas service + canvas.Init(opts.Verbose) + + // setup router routes, err := setupRoutes(opts) if err != nil { apilog.Println("Error setting up router:", err) return err } + + server.Addr = ":" + opts.Port + server.ReadTimeout = time.Duration(opts.RWTimeout) + server.WriteTimeout = time.Duration(opts.RWTimeout) server.Handler = routes + apilog.Println("starting server on :" + opts.Port) return nil } // Cleanup closes the database client func Cleanup() error { - // TODO notify websocket clients about server shutdown - // close db - dbClient.Close() + dbClient.Close() // close db client if r := recover(); r != nil { err, ok := r.(error) diff --git a/pkg/api/options.go b/pkg/api/options.go new file mode 100644 index 0000000..efcb71f --- /dev/null +++ b/pkg/api/options.go @@ -0,0 +1,13 @@ +package api + +import "github.com/drawr-team/drawrserver/pkg/bolt" + +// Options holds the basic configuration of the http server +// TODO implement reading options from a config file +type Options struct { + Port string `json:"port"` + RWTimeout int64 `json:"timeout"` + Verbose bool `json:"verbose"` + Debug bool `json:"debug"` + Database *bolt.Options `json:"database"` +} diff --git a/pkg/canvas/service.go b/pkg/canvas/service.go index 5310f5a..1336b14 100644 --- a/pkg/canvas/service.go +++ b/pkg/canvas/service.go @@ -11,19 +11,30 @@ import ( var svc canvasService type canvasService struct { + verbose bool + hubs map[string]*websock.Hub } // Init initializes the message service -func Init() { +func Init(verbose bool) { + svc.verbose = verbose svc.hubs = make(map[string]*websock.Hub) } +// Close notifies all hubs about the server going offline +func Close() { + for _, h := range svc.hubs { + h.Close() + } +} + // Connect adds a new client connection to the session hub func Connect(w http.ResponseWriter, r *http.Request, s service.Session) error { _, ok := svc.hubs[s.ID] if !ok { svc.hubs[s.ID] = websock.NewHub() + svc.hubs[s.ID].Verbose = svc.verbose } c, err := websock.Upgrade(w, r, w.Header()) diff --git a/pkg/websock/README.md b/pkg/websock/README.md index e1e0760..97fe8e4 100644 --- a/pkg/websock/README.md +++ b/pkg/websock/README.md @@ -3,9 +3,5 @@ * Hub maps connections to their ID * 1 hub per session * read and write channels for every connection -* read and write channel for hub -* connection worker instead of hub worker - -## Ideas: - -* 1 Hub per server? +* broadcast channel for hub +* connection workers instead of hub worker diff --git a/pkg/websock/connection.go b/pkg/websock/connection.go index 97221bb..1e958bd 100644 --- a/pkg/websock/connection.go +++ b/pkg/websock/connection.go @@ -8,10 +8,9 @@ import ( // Connection wraps a messenger connection type Connection struct { - messenger Messenger - wg *sync.WaitGroup - done bool + ws *websocket.Conn + wg *sync.WaitGroup // connection message channels send chan []byte received chan []byte @@ -21,40 +20,33 @@ type Connection struct { // NewConnection constructs a new Connection from a websocket.Conn func NewConnection(ws *websocket.Conn) *Connection { + println("NewConnection:", ws.LocalAddr()) c := &Connection{ - messenger: WebsocketMessenger{ws}, - wg: new(sync.WaitGroup), - done: false, - send: make(chan []byte, 256), - received: make(chan []byte, 256), + ws: ws, + wg: new(sync.WaitGroup), + send: make(chan []byte), + received: make(chan []byte), } if ws != nil { c.Addr = ws.RemoteAddr().String() } else { c.Addr = "none" } - return c -} -// RunWorkers starts the reader and writer in seperate goroutines -// for the connection and returns a sync.WaitGroup -func (c *Connection) RunWorkers() { - go c.Reader() - go c.Writer() -} + go c.reader() + go c.writer() -// Wait blocks until the Read and Write workers finish -func (c *Connection) Wait() { - c.wg.Wait() -} + c.ws.SetCloseHandler(func(code int, text string) error { + switch code { + case websocket.CloseGoingAway: + println("peer going away") + case websocket.CloseNormalClosure: + println("peer closing normally") + } + return nil + }) -// StopWorkers sends the done signal to the workers -func (c *Connection) Close() error { - close(c.send) - c.done = true - c.wg.Wait() // wait for Reader to finish - close(c.received) - return c.messenger.Close() + return c } // SendChan returns the send channel @@ -67,32 +59,58 @@ func (c *Connection) ReceiveChan() chan []byte { return c.received } +// Close sends the done signal to the workers +func (c *Connection) Close() error { + println("closing connection", c.Addr) + + payload := websocket.FormatCloseMessage(websocket.CloseGoingAway, "server shutting down") + if err := c.ws.WriteMessage(websocket.CloseMessage, payload); err != nil { + return err + } + + println("waiting for workers to quit") + close(c.send) // closing send channel causes writer to stop + c.wg.Wait() + + println("closing connection was successfull") + return c.ws.Close() +} + // Reader reads a message from the websocket connection -func (c *Connection) Reader() { +func (c *Connection) reader() { c.wg.Add(1) - defer c.wg.Done() - - for !c.done { - message, err := c.messenger.ReadMessage() + for { + t, msg, err := c.ws.ReadMessage() if err != nil { + if websocket.IsCloseError(err, websocket.CloseGoingAway) { + println("connection to peer closed as expected") + break + } // TODO handle ReadMessage errors - panic(err) - } else { - c.received <- message + panic("Failed to read message: " + err.Error()) + } + + switch t { + case websocket.TextMessage: + c.received <- msg + case websocket.BinaryMessage: + panic("cannot handle binary message") } } + c.wg.Done() + println("reader ended") } -// Writer writes a message to the websocket connection -func (c *Connection) Writer() { +// writer writes a message to the websocket connection +func (c *Connection) writer() { c.wg.Add(1) - defer c.wg.Done() - - for message := range c.send { - err := c.messenger.WriteMessage(message) + for msg := range c.send { + err := c.ws.WriteMessage(websocket.TextMessage, msg) if err != nil { // TODO handle WriteMessage errors - panic(err) + panic("Failed to write message: " + err.Error()) } } + c.wg.Done() + println("writer ended") } diff --git a/pkg/websock/hub.go b/pkg/websock/hub.go index 8e8ec00..a926c9f 100644 --- a/pkg/websock/hub.go +++ b/pkg/websock/hub.go @@ -5,6 +5,8 @@ import ( "log" "os" "sync" + + "github.com/gorilla/websocket" ) var ErrConnectionNotFound = errors.New("Connection not found") @@ -18,9 +20,10 @@ type Hub struct { connectionsMx sync.RWMutex // TODO hub-wide message channel? - // broadcast chan []byte + broadcast chan []byte - log log.Logger + quit chan chan struct{} + log log.Logger } // NewHub creates a new hub @@ -29,22 +32,41 @@ func NewHub() *Hub { Verbose: false, connections: make(map[string]Connection), connectionsMx: sync.RWMutex{}, - // broadcast: make(chan []byte, 2048), - log: *log.New(os.Stdout, "[websock]", log.LstdFlags), + broadcast: make(chan []byte), + quit: make(chan chan struct{}), + log: *log.New(os.Stdout, "[ws]\t", log.LstdFlags), } + go h.broadcaster() return h } +func (h *Hub) BroadcastChan() chan []byte { + return h.broadcast +} + +// Close sends the quit signal to the monitor worker +func (h *Hub) Close() { + h.log.Println("closing hub") + q := make(chan struct{}) + h.quit <- q + + for cID := range h.connections { + h.RemoveConnection(cID) + } + <-q +} + // AddConnection remembers a connection func (h *Hub) AddConnection(id string, c Connection) { h.connectionsMx.Lock() defer h.connectionsMx.Unlock() + h.connections[id] = c + if h.Verbose { - h.log.Println("new connection:", id) + h.log.Printf("add connection: %v (%v)", id, c.Addr) } - h.connections[id] = c } // RemoveConnection forgets a connection @@ -52,16 +74,18 @@ func (h *Hub) RemoveConnection(id string) { h.connectionsMx.Lock() defer h.connectionsMx.Unlock() - if h.Verbose { - h.log.Println("remove connection:", id) - } - - if c, ok := h.connections[id]; ok { + c, ok := h.connections[id] + if ok { if err := c.Close(); err != nil { - panic(err) + panic("Failed to remove connection" + id + " from hub: " + err.Error()) } delete(h.connections, id) } + + if h.Verbose { + h.log.Printf("remove connection: %v (%v)", id, c.Addr) + } + } func (h *Hub) GetConnection(id string) (*Connection, error) { @@ -72,9 +96,31 @@ func (h *Hub) GetConnection(id string) (*Connection, error) { return &c, nil } -// Broadcast sends a message to all connections -func (h *Hub) Broadcast(m []byte) { - for _, conn := range h.connections { - conn.SendChan() <- m +// broadcast sends a message to all connections +func (h *Hub) sendBroadcastMessage(m []byte) error { + for cID, conn := range h.connections { + h.log.Println("notified:", cID) + pm, err := websocket.NewPreparedMessage(websocket.TextMessage, m) + if err != nil { + return err + } + if err := conn.ws.WritePreparedMessage(pm); err != nil { + return err + } + } + return nil +} + +func (h *Hub) broadcaster() { + h.log.Println("starting broadcaster worker...") + for { + select { + case msg := <-h.broadcast: + h.log.Println("broadcasting:", string(msg)) + h.sendBroadcastMessage(msg) + case q := <-h.quit: + close(q) + return + } } } diff --git a/pkg/websock/messenger.go b/pkg/websock/messenger.go deleted file mode 100644 index a05392e..0000000 --- a/pkg/websock/messenger.go +++ /dev/null @@ -1,44 +0,0 @@ -package websock - -import ( - "io" - - "github.com/gorilla/websocket" -) - -// MessageReader can receive []byte messages -type MessageReader interface { - ReadMessage() ([]byte, error) -} - -// MessageWriter can send []byte messages -type MessageWriter interface { - WriteMessage([]byte) error -} - -// Messenger implements MessageReader and MessageWriter -type Messenger interface { - MessageReader - MessageWriter - io.Closer -} - -// WebsocketMessenger implements the the Messenger interface for websocket.Conn -type WebsocketMessenger struct{ *websocket.Conn } - -// ReadMessage implements MessageReader for websocket.Conn -func (wsm WebsocketMessenger) ReadMessage() (b []byte, err error) { - _, b, err = wsm.Conn.ReadMessage() - return -} - -// WriteMessage implements MessageWriter for websocket.Conn -func (wsm WebsocketMessenger) WriteMessage(b []byte) (err error) { - err = wsm.Conn.WriteMessage(websocket.TextMessage, b) - return -} - -// Close implements Closer for websocket.Close -func (wsm WebsocketMessenger) Close() error { - return wsm.Conn.Close() -}