Skip to content

Commit

Permalink
Log how much data we're sending over websockets per app (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwwoelfel authored Nov 26, 2024
1 parent cdf6300 commit e972fa3
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 80 deletions.
6 changes: 3 additions & 3 deletions server/src/instant/dash/routes.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@
{:undertow/websocket
{:on-open (fn [{:keys [channel]}]
(tracer/with-span! {:name "ws-play/on-open" :attributes {:id id}}
(ws/send-json! (format "[%s] ok" id) channel)))
(ws/send-json! nil (format "[%s] ok" id) channel)))
:on-message (fn [{:keys [^WebSocketChannel channel data]}]
(tracer/with-span! {:name "ws-play/on-message" :attributes {:id id :data data}}
(condp = (string/trim data)
Expand All @@ -1161,8 +1161,8 @@
"throw-err"
(tracer/with-span! {:name "ws-play/throw-err" :attributes {:id id}}
(do (.close channel)
(ws/send-json! "this can't send" channel)))
(ws/send-json! (format "[%s] received %s" id data) channel))))
(ws/send-json! nil "this can't send" channel)))
(ws/send-json! nil (format "[%s] received %s" id data) channel))))

:on-close (fn [_]
(tracer/record-info! {:name "ws-play/on-close" :attributes {:id id}}))
Expand Down
31 changes: 16 additions & 15 deletions server/src/instant/lib/ring/websocket.clj
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@

(defn send-json!
"Serializes `obj` to json, and sends over a websocket."
[obj {:keys [websocket-stub undertow-websocket send-lock]}]
[app-id obj {:keys [websocket-stub undertow-websocket send-lock]}]
;; Websockets/sendText _should_ be thread-safe
;; But, t becomes thread-unsafe when we use per-message-deflate
;; Using a `send-lock` to make `send-json!` thread-safe
Expand All @@ -196,20 +196,21 @@
(let [obj-json (->json obj)
p (promise)
_ (try
(tracer/add-data!
{:attributes
{:send-lock.queue-length (.getQueueLength send-lock)
:send-lock.is-locked (.isLocked send-lock)
:send-lock.held-by-current-thread (.isHeldByCurrentThread send-lock)}})
(.lock send-lock)
(WebSockets/sendText
^String obj-json
^WebSocketChannel undertow-websocket
(proxy [WebSocketCallback] []
(complete [ws-conn context]
(deliver p nil))
(onError [ws-conn context throwable]
(deliver p throwable))))
(tracer/with-span! {:name "ws/send-json!"
:attributes {:size (count obj-json)
:app-id app-id
:send-lock.queue-length (.getQueueLength send-lock)
:send-lock.is-locked (.isLocked send-lock)
:send-lock.held-by-current-thread (.isHeldByCurrentThread send-lock)}}
(.lock send-lock)
(WebSockets/sendText
^String obj-json
^WebSocketChannel undertow-websocket
(proxy [WebSocketCallback] []
(complete [ws-conn context]
(deliver p nil))
(onError [ws-conn context throwable]
(deliver p throwable)))))
(finally
(.unlock send-lock)))
ret @p]
Expand Down
2 changes: 2 additions & 0 deletions server/src/instant/reactive/ephemeral.clj
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
:when q]
(receive-queue/enqueue->receive-q q
{:op :refresh-presence
:app-id (:app-id room-key)
:room-id room-id
:data room-data
:session-id sess-id}))))
Expand Down Expand Up @@ -214,6 +215,7 @@
:when q]
(receive-queue/enqueue->receive-q q
{:op :refresh-presence
:app-id app-id
:room-id room-id
:data room-data
:session-id sess-id})))))))
Expand Down
90 changes: 50 additions & 40 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@
(rs/set-session-props! store-conn sess-id {:auth auth
:creator creator
:versions versions})
(rs/send-event! store-conn sess-id {:op :init-ok
:session-id sess-id
:client-event-id client-event-id
:auth auth
:attrs attrs})))
(rs/send-event! store-conn app-id sess-id {:op :init-ok
:session-id sess-id
:client-event-id client-event-id
:auth auth
:attrs attrs})))

(defn- get-auth! [store-conn sess-id]
(let [auth (rs/get-auth @store-conn sess-id)]
Expand All @@ -120,8 +120,8 @@

(cond
(contains? instaql-queries q)
(rs/send-event! store-conn sess-id {:op :add-query-exists :q q
:client-event-id client-event-id})
(rs/send-event! store-conn (:id app) sess-id {:op :add-query-exists :q q
:client-event-id client-event-id})

:else
(let [return-type (keyword (or return-type "join-rows"))
Expand All @@ -138,15 +138,15 @@
:admin? admin?
:current-user user}
{:keys [instaql-result]} (rq/instaql-query-reactive! store-conn ctx q return-type)]
(rs/send-event! store-conn sess-id {:op :add-query-ok :q q :result instaql-result
:processed-tx-id processed-tx-id
:client-event-id client-event-id})))))
(rs/send-event! store-conn app-id sess-id {:op :add-query-ok :q q :result instaql-result
:processed-tx-id processed-tx-id
:client-event-id client-event-id})))))

(defn- handle-remove-query! [store-conn sess-id {:keys [q client-event-id] :as _event}]
(let [{:keys [app]} (get-auth! store-conn sess-id)]
(rs/remove-query! store-conn sess-id (:id app) q)
(rs/send-event! store-conn sess-id {:op :remove-query-ok :q q
:client-event-id client-event-id})))
(rs/send-event! store-conn (:id app) sess-id {:op :remove-query-ok :q q
:client-event-id client-event-id})))

(defn- recompute-instaql-query!
[{:keys [store-conn current-user app-id sess-id attrs table-info admin?]}
Expand Down Expand Up @@ -196,10 +196,10 @@
:num-spam num-spam
:num-computations num-computations}}
(when (seq computations)
(rs/send-event! store-conn sess-id {:op :refresh-ok
:processed-tx-id processed-tx-id
:attrs attrs
:computations computations})))))
(rs/send-event! store-conn app-id sess-id {:op :refresh-ok
:processed-tx-id processed-tx-id
:attrs attrs
:computations computations})))))

;; -----
;; transact
Expand All @@ -220,7 +220,7 @@
:datalog-query-fn d/query
:attrs (attr-model/get-by-app-id app-id)}
coerced)]
(rs/send-event! store-conn sess-id
(rs/send-event! store-conn app-id sess-id
{:op :transact-ok
:tx-id tx-id
:client-event-id client-event-id})))
Expand All @@ -229,12 +229,14 @@
;; error

(defn handle-error! [store-conn sess-id {:keys [status
app-id
client-event-id
original-event
type
message
hint]}]
(rs/send-event! store-conn
app-id
sess-id
{:op :error
:status status
Expand Down Expand Up @@ -282,17 +284,17 @@
app-id (-> auth :app :id)
current-user (-> auth :user)]
(eph/join-room! eph-store-atom app-id sess-id current-user room-id)
(rs/send-event! store-conn sess-id {:op :join-room-ok
:room-id room-id
:client-event-id client-event-id})))
(rs/send-event! store-conn app-id sess-id {:op :join-room-ok
:room-id room-id
:client-event-id client-event-id})))

(defn- handle-leave-room! [store-conn eph-store-atom sess-id {:keys [client-event-id room-id] :as _event}]
(let [auth (get-auth! store-conn sess-id)
app-id (-> auth :app :id)]
(eph/leave-room! eph-store-atom app-id sess-id room-id)
(rs/send-event! store-conn sess-id {:op :leave-room-ok
:room-id room-id
:client-event-id client-event-id})))
(rs/send-event! store-conn app-id sess-id {:op :leave-room-ok
:room-id room-id
:client-event-id client-event-id})))

(defn assert-in-room! [store-v app-id room-id sess-id]
(when-not (eph/in-room? store-v app-id room-id sess-id)
Expand All @@ -307,15 +309,15 @@
app-id (-> auth :app :id)
_ (assert-in-room! @eph-store-atom app-id room-id sess-id)]
(eph/set-presence! eph-store-atom app-id sess-id room-id data)
(rs/send-event! store-conn sess-id {:op :set-presence-ok
:room-id room-id
:client-event-id client-event-id})))
(rs/send-event! store-conn app-id sess-id {:op :set-presence-ok
:room-id room-id
:client-event-id client-event-id})))

(defn- handle-refresh-presence!
[store-conn sess-id {:keys [room-id data]}]
(rs/send-event! store-conn sess-id {:op :refresh-presence
:room-id room-id
:data data}))
[store-conn sess-id {:keys [app-id room-id data]}]
(rs/send-event! store-conn app-id sess-id {:op :refresh-presence
:room-id room-id
:data data}))

(defn- handle-client-broadcast!
"Broadcasts a client message to other sessions in the room"
Expand Down Expand Up @@ -343,16 +345,16 @@
(when (seq remote-ids)
(eph/broadcast app-id remote-ids base-msg))

(rs/send-event! store-conn sess-id (assoc base-msg
:op :client-broadcast-ok
:client-event-id client-event-id))))
(rs/send-event! store-conn app-id sess-id (assoc base-msg
:op :client-broadcast-ok
:client-event-id client-event-id))))

(defn- handle-server-broadcast! [store-conn eph-store-atom sess-id {:keys [app-id room-id topic data]}]
(when (eph/in-room? @eph-store-atom app-id room-id sess-id)
(rs/send-event! store-conn sess-id {:op :server-broadcast
:room-id room-id
:topic topic
:data data})))
(rs/send-event! store-conn app-id sess-id {:op :server-broadcast
:room-id room-id
:topic topic
:data data})))

(defn handle-event [store-conn eph-store-atom session event debug-info]
(tracer/with-span! {:name "receive-worker/handle-event"}
Expand All @@ -379,7 +381,7 @@
;; --------------
;; Receive Workers

(defn- handle-instant-exception [session original-event instant-ex debug-info]
(defn- handle-instant-exception [session app-id original-event instant-ex debug-info]
(let [sess-id (:session/id session)
q (:receive-q (:session/socket session))
{:keys [client-event-id]} original-event
Expand All @@ -402,6 +404,7 @@
::ex/validation-failed)
(receive-queue/enqueue->receive-q q
{:op :error
:app-id app-id
:status 400
:client-event-id client-event-id
:original-event (merge original-event
Expand All @@ -421,6 +424,7 @@
(tracer/add-exception! instant-ex {:escaping? false})
(receive-queue/enqueue->receive-q q
{:op :error
:app-id app-id
:status 500
:client-event-id client-event-id
:original-event (merge original-event
Expand All @@ -430,14 +434,15 @@
:hint hint
:session-id sess-id})))))

(defn- handle-uncaught-err [session original-event root-err debug-info]
(defn- handle-uncaught-err [session app-id original-event root-err debug-info]
(let [sess-id (:session/id session)
q (:receive-q (:session/socket session))
{:keys [client-event-id]} original-event]
(tracer/add-exception! root-err {:escaping? false})

(receive-queue/enqueue->receive-q q
{:op :error
:app-id app-id
:client-event-id client-event-id
:status 500
:original-event (merge original-event
Expand Down Expand Up @@ -496,13 +501,18 @@
(tracer/record-info! {:name "caught-throwable"})
(let [original-event event
instant-ex (ex/find-instant-exception e)
root-err (root-cause e)]
root-err (root-cause e)
app-id (some-> (rs/get-auth @store-conn (:session/id session))
:app
:id)]
(cond
instant-ex (handle-instant-exception session
app-id
original-event
instant-ex
@debug-info)
:else (handle-uncaught-err session
app-id
original-event
root-err
@debug-info))))
Expand Down
16 changes: 4 additions & 12 deletions server/src/instant/reactive/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
[datascript.core :as d]
[instant.util.coll :as ucoll]
[instant.lib.ring.websocket :as ws]
[instant.util.async :as ua]
[instant.util.tracer :as tracer]
[instant.util.exception :as ex]))

Expand Down Expand Up @@ -565,20 +564,20 @@
;; -----------------
;; Websocket Helpers

(defn send-event! [conn sess-id event]
(defn send-event! [conn app-id sess-id event]
(let [{:keys [ws-conn]} (get-socket @conn sess-id)]
(when-not ws-conn
(ex/throw-socket-missing! sess-id))
(try
(ws/send-json! event ws-conn)
(ws/send-json! app-id event ws-conn)
(catch java.io.IOException e
(ex/throw-socket-error! sess-id e)))))

(defn try-send-event!
"Does a best-effort send. If it fails, we record and swallow the exception"
[conn sess-id event]
[conn app-id sess-id event]
(try
(send-event! conn sess-id event)
(send-event! conn app-id sess-id event)
(catch Exception e
(tracer/with-span! {:name "rs/try-send-event-swallowed-err"}
(tracer/record-exception-span!
Expand All @@ -587,13 +586,6 @@
:attributes {:event (str event)
:escaping? false}})))))

(defn try-broadcast-event!
"Sends an event to multiple sessions"
[conn sess-ids event]
(ua/vfuture-pmap (fn [sess-id]
(try-send-event! conn sess-id event))
sess-ids))

;; -----
;; start

Expand Down
6 changes: 3 additions & 3 deletions server/src/instant/session_counter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
(into {})))

(defn send-report! [report ws]
(ws/send-json! {:op :report :report report} ws))
(ws/send-json! nil {:op :report :report report} ws))

;; -------
;; Reporter
Expand Down Expand Up @@ -83,8 +83,8 @@
(if (= token api-key)
(do (add-websocket-listener! ws-id channel)
(send-report! (store->report @rs/store-conn) channel))
(ws/send-json! {:op :error
:message "Invalid token"}
(ws/send-json! nil {:op :error
:message "Invalid token"}
channel))))
:on-error (fn [{throwable :error}]
(remove-websocket-listener! ws-id)
Expand Down
14 changes: 8 additions & 6 deletions server/src/instant/util/logging_exporter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@
(def exclude-span?
(if (= :prod (config/get-env))
(fn [span]
(let [attrs (.getAttributes span)]
(when-let [op (.get attrs op-attr-key)]
(or (= op ":set-presence")
(= op ":refresh-presence")
(= op ":server-broadcast")
(= op ":client-broadcast")))))
(let [name (.getName span)
attrs (.getAttributes span)]
(or (= name "ws/send-json!")
(when-let [op (.get attrs op-attr-key)]
(or (= op ":set-presence")
(= op ":refresh-presence")
(= op ":server-broadcast")
(= op ":client-broadcast"))))))
(fn [_span]
false)))

Expand Down
2 changes: 1 addition & 1 deletion server/test/instant/reactive/session_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
*eph-store-atom* eph-store-atom]
(with-redefs [receive-queue/receive-q receive-q
eph/room-refresh-ch room-refresh-ch
ws/send-json! (fn [msg fake-ws-conn]
ws/send-json! (fn [_app-id msg fake-ws-conn]
(a/>!! fake-ws-conn msg))

rq/instaql-query-reactive!
Expand Down

0 comments on commit e972fa3

Please sign in to comment.