From e972fa3c2391e1bbeb4f84bebd550542d99035ab Mon Sep 17 00:00:00 2001 From: Daniel Woelfel Date: Tue, 26 Nov 2024 05:43:26 -0800 Subject: [PATCH] Log how much data we're sending over websockets per app (#543) --- server/src/instant/dash/routes.clj | 6 +- server/src/instant/lib/ring/websocket.clj | 31 +++---- server/src/instant/reactive/ephemeral.clj | 2 + server/src/instant/reactive/session.clj | 90 ++++++++++--------- server/src/instant/reactive/store.clj | 16 +--- server/src/instant/session_counter.clj | 6 +- server/src/instant/util/logging_exporter.clj | 14 +-- server/test/instant/reactive/session_test.clj | 2 +- 8 files changed, 87 insertions(+), 80 deletions(-) diff --git a/server/src/instant/dash/routes.clj b/server/src/instant/dash/routes.clj index 72224e42d..aaaf4694e 100644 --- a/server/src/instant/dash/routes.clj +++ b/server/src/instant/dash/routes.clj @@ -1151,7 +1151,7 @@ {:undertow/websocket {:on-open (fn [{:keys [channel]}] (tracer/with-span! {:name "ws-play/on-open" :attributes {:id id}} - (ws/send-json! (format "[%s] ok" id) channel))) + (ws/send-json! nil (format "[%s] ok" id) channel))) :on-message (fn [{:keys [^WebSocketChannel channel data]}] (tracer/with-span! {:name "ws-play/on-message" :attributes {:id id :data data}} (condp = (string/trim data) @@ -1161,8 +1161,8 @@ "throw-err" (tracer/with-span! {:name "ws-play/throw-err" :attributes {:id id}} (do (.close channel) - (ws/send-json! "this can't send" channel))) - (ws/send-json! (format "[%s] received %s" id data) channel)))) + (ws/send-json! nil "this can't send" channel))) + (ws/send-json! nil (format "[%s] received %s" id data) channel)))) :on-close (fn [_] (tracer/record-info! {:name "ws-play/on-close" :attributes {:id id}})) diff --git a/server/src/instant/lib/ring/websocket.clj b/server/src/instant/lib/ring/websocket.clj index 6f4eb8252..c477bc687 100644 --- a/server/src/instant/lib/ring/websocket.clj +++ b/server/src/instant/lib/ring/websocket.clj @@ -187,7 +187,7 @@ (defn send-json! "Serializes `obj` to json, and sends over a websocket." - [obj {:keys [websocket-stub undertow-websocket send-lock]}] + [app-id obj {:keys [websocket-stub undertow-websocket send-lock]}] ;; Websockets/sendText _should_ be thread-safe ;; But, t becomes thread-unsafe when we use per-message-deflate ;; Using a `send-lock` to make `send-json!` thread-safe @@ -196,20 +196,21 @@ (let [obj-json (->json obj) p (promise) _ (try - (tracer/add-data! - {:attributes - {:send-lock.queue-length (.getQueueLength send-lock) - :send-lock.is-locked (.isLocked send-lock) - :send-lock.held-by-current-thread (.isHeldByCurrentThread send-lock)}}) - (.lock send-lock) - (WebSockets/sendText - ^String obj-json - ^WebSocketChannel undertow-websocket - (proxy [WebSocketCallback] [] - (complete [ws-conn context] - (deliver p nil)) - (onError [ws-conn context throwable] - (deliver p throwable)))) + (tracer/with-span! {:name "ws/send-json!" + :attributes {:size (count obj-json) + :app-id app-id + :send-lock.queue-length (.getQueueLength send-lock) + :send-lock.is-locked (.isLocked send-lock) + :send-lock.held-by-current-thread (.isHeldByCurrentThread send-lock)}} + (.lock send-lock) + (WebSockets/sendText + ^String obj-json + ^WebSocketChannel undertow-websocket + (proxy [WebSocketCallback] [] + (complete [ws-conn context] + (deliver p nil)) + (onError [ws-conn context throwable] + (deliver p throwable))))) (finally (.unlock send-lock))) ret @p] diff --git a/server/src/instant/reactive/ephemeral.clj b/server/src/instant/reactive/ephemeral.clj index 71cdb0d38..2ba6eec67 100644 --- a/server/src/instant/reactive/ephemeral.clj +++ b/server/src/instant/reactive/ephemeral.clj @@ -184,6 +184,7 @@ :when q] (receive-queue/enqueue->receive-q q {:op :refresh-presence + :app-id (:app-id room-key) :room-id room-id :data room-data :session-id sess-id})))) @@ -214,6 +215,7 @@ :when q] (receive-queue/enqueue->receive-q q {:op :refresh-presence + :app-id app-id :room-id room-id :data room-data :session-id sess-id}))))))) diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index 2edbd916d..5f9249624 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -102,11 +102,11 @@ (rs/set-session-props! store-conn sess-id {:auth auth :creator creator :versions versions}) - (rs/send-event! store-conn sess-id {:op :init-ok - :session-id sess-id - :client-event-id client-event-id - :auth auth - :attrs attrs}))) + (rs/send-event! store-conn app-id sess-id {:op :init-ok + :session-id sess-id + :client-event-id client-event-id + :auth auth + :attrs attrs}))) (defn- get-auth! [store-conn sess-id] (let [auth (rs/get-auth @store-conn sess-id)] @@ -120,8 +120,8 @@ (cond (contains? instaql-queries q) - (rs/send-event! store-conn sess-id {:op :add-query-exists :q q - :client-event-id client-event-id}) + (rs/send-event! store-conn (:id app) sess-id {:op :add-query-exists :q q + :client-event-id client-event-id}) :else (let [return-type (keyword (or return-type "join-rows")) @@ -138,15 +138,15 @@ :admin? admin? :current-user user} {:keys [instaql-result]} (rq/instaql-query-reactive! store-conn ctx q return-type)] - (rs/send-event! store-conn sess-id {:op :add-query-ok :q q :result instaql-result - :processed-tx-id processed-tx-id - :client-event-id client-event-id}))))) + (rs/send-event! store-conn app-id sess-id {:op :add-query-ok :q q :result instaql-result + :processed-tx-id processed-tx-id + :client-event-id client-event-id}))))) (defn- handle-remove-query! [store-conn sess-id {:keys [q client-event-id] :as _event}] (let [{:keys [app]} (get-auth! store-conn sess-id)] (rs/remove-query! store-conn sess-id (:id app) q) - (rs/send-event! store-conn sess-id {:op :remove-query-ok :q q - :client-event-id client-event-id}))) + (rs/send-event! store-conn (:id app) sess-id {:op :remove-query-ok :q q + :client-event-id client-event-id}))) (defn- recompute-instaql-query! [{:keys [store-conn current-user app-id sess-id attrs table-info admin?]} @@ -196,10 +196,10 @@ :num-spam num-spam :num-computations num-computations}} (when (seq computations) - (rs/send-event! store-conn sess-id {:op :refresh-ok - :processed-tx-id processed-tx-id - :attrs attrs - :computations computations}))))) + (rs/send-event! store-conn app-id sess-id {:op :refresh-ok + :processed-tx-id processed-tx-id + :attrs attrs + :computations computations}))))) ;; ----- ;; transact @@ -220,7 +220,7 @@ :datalog-query-fn d/query :attrs (attr-model/get-by-app-id app-id)} coerced)] - (rs/send-event! store-conn sess-id + (rs/send-event! store-conn app-id sess-id {:op :transact-ok :tx-id tx-id :client-event-id client-event-id}))) @@ -229,12 +229,14 @@ ;; error (defn handle-error! [store-conn sess-id {:keys [status + app-id client-event-id original-event type message hint]}] (rs/send-event! store-conn + app-id sess-id {:op :error :status status @@ -282,17 +284,17 @@ app-id (-> auth :app :id) current-user (-> auth :user)] (eph/join-room! eph-store-atom app-id sess-id current-user room-id) - (rs/send-event! store-conn sess-id {:op :join-room-ok - :room-id room-id - :client-event-id client-event-id}))) + (rs/send-event! store-conn app-id sess-id {:op :join-room-ok + :room-id room-id + :client-event-id client-event-id}))) (defn- handle-leave-room! [store-conn eph-store-atom sess-id {:keys [client-event-id room-id] :as _event}] (let [auth (get-auth! store-conn sess-id) app-id (-> auth :app :id)] (eph/leave-room! eph-store-atom app-id sess-id room-id) - (rs/send-event! store-conn sess-id {:op :leave-room-ok - :room-id room-id - :client-event-id client-event-id}))) + (rs/send-event! store-conn app-id sess-id {:op :leave-room-ok + :room-id room-id + :client-event-id client-event-id}))) (defn assert-in-room! [store-v app-id room-id sess-id] (when-not (eph/in-room? store-v app-id room-id sess-id) @@ -307,15 +309,15 @@ app-id (-> auth :app :id) _ (assert-in-room! @eph-store-atom app-id room-id sess-id)] (eph/set-presence! eph-store-atom app-id sess-id room-id data) - (rs/send-event! store-conn sess-id {:op :set-presence-ok - :room-id room-id - :client-event-id client-event-id}))) + (rs/send-event! store-conn app-id sess-id {:op :set-presence-ok + :room-id room-id + :client-event-id client-event-id}))) (defn- handle-refresh-presence! - [store-conn sess-id {:keys [room-id data]}] - (rs/send-event! store-conn sess-id {:op :refresh-presence - :room-id room-id - :data data})) + [store-conn sess-id {:keys [app-id room-id data]}] + (rs/send-event! store-conn app-id sess-id {:op :refresh-presence + :room-id room-id + :data data})) (defn- handle-client-broadcast! "Broadcasts a client message to other sessions in the room" @@ -343,16 +345,16 @@ (when (seq remote-ids) (eph/broadcast app-id remote-ids base-msg)) - (rs/send-event! store-conn sess-id (assoc base-msg - :op :client-broadcast-ok - :client-event-id client-event-id)))) + (rs/send-event! store-conn app-id sess-id (assoc base-msg + :op :client-broadcast-ok + :client-event-id client-event-id)))) (defn- handle-server-broadcast! [store-conn eph-store-atom sess-id {:keys [app-id room-id topic data]}] (when (eph/in-room? @eph-store-atom app-id room-id sess-id) - (rs/send-event! store-conn sess-id {:op :server-broadcast - :room-id room-id - :topic topic - :data data}))) + (rs/send-event! store-conn app-id sess-id {:op :server-broadcast + :room-id room-id + :topic topic + :data data}))) (defn handle-event [store-conn eph-store-atom session event debug-info] (tracer/with-span! {:name "receive-worker/handle-event"} @@ -379,7 +381,7 @@ ;; -------------- ;; Receive Workers -(defn- handle-instant-exception [session original-event instant-ex debug-info] +(defn- handle-instant-exception [session app-id original-event instant-ex debug-info] (let [sess-id (:session/id session) q (:receive-q (:session/socket session)) {:keys [client-event-id]} original-event @@ -402,6 +404,7 @@ ::ex/validation-failed) (receive-queue/enqueue->receive-q q {:op :error + :app-id app-id :status 400 :client-event-id client-event-id :original-event (merge original-event @@ -421,6 +424,7 @@ (tracer/add-exception! instant-ex {:escaping? false}) (receive-queue/enqueue->receive-q q {:op :error + :app-id app-id :status 500 :client-event-id client-event-id :original-event (merge original-event @@ -430,7 +434,7 @@ :hint hint :session-id sess-id}))))) -(defn- handle-uncaught-err [session original-event root-err debug-info] +(defn- handle-uncaught-err [session app-id original-event root-err debug-info] (let [sess-id (:session/id session) q (:receive-q (:session/socket session)) {:keys [client-event-id]} original-event] @@ -438,6 +442,7 @@ (receive-queue/enqueue->receive-q q {:op :error + :app-id app-id :client-event-id client-event-id :status 500 :original-event (merge original-event @@ -496,13 +501,18 @@ (tracer/record-info! {:name "caught-throwable"}) (let [original-event event instant-ex (ex/find-instant-exception e) - root-err (root-cause e)] + root-err (root-cause e) + app-id (some-> (rs/get-auth @store-conn (:session/id session)) + :app + :id)] (cond instant-ex (handle-instant-exception session + app-id original-event instant-ex @debug-info) :else (handle-uncaught-err session + app-id original-event root-err @debug-info)))) diff --git a/server/src/instant/reactive/store.clj b/server/src/instant/reactive/store.clj index dc2c766ea..6d807a80c 100644 --- a/server/src/instant/reactive/store.clj +++ b/server/src/instant/reactive/store.clj @@ -18,7 +18,6 @@ [datascript.core :as d] [instant.util.coll :as ucoll] [instant.lib.ring.websocket :as ws] - [instant.util.async :as ua] [instant.util.tracer :as tracer] [instant.util.exception :as ex])) @@ -565,20 +564,20 @@ ;; ----------------- ;; Websocket Helpers -(defn send-event! [conn sess-id event] +(defn send-event! [conn app-id sess-id event] (let [{:keys [ws-conn]} (get-socket @conn sess-id)] (when-not ws-conn (ex/throw-socket-missing! sess-id)) (try - (ws/send-json! event ws-conn) + (ws/send-json! app-id event ws-conn) (catch java.io.IOException e (ex/throw-socket-error! sess-id e))))) (defn try-send-event! "Does a best-effort send. If it fails, we record and swallow the exception" - [conn sess-id event] + [conn app-id sess-id event] (try - (send-event! conn sess-id event) + (send-event! conn app-id sess-id event) (catch Exception e (tracer/with-span! {:name "rs/try-send-event-swallowed-err"} (tracer/record-exception-span! @@ -587,13 +586,6 @@ :attributes {:event (str event) :escaping? false}}))))) -(defn try-broadcast-event! - "Sends an event to multiple sessions" - [conn sess-ids event] - (ua/vfuture-pmap (fn [sess-id] - (try-send-event! conn sess-id event)) - sess-ids)) - ;; ----- ;; start diff --git a/server/src/instant/session_counter.clj b/server/src/instant/session_counter.clj index 3c38bd416..f4b57a672 100644 --- a/server/src/instant/session_counter.clj +++ b/server/src/instant/session_counter.clj @@ -36,7 +36,7 @@ (into {}))) (defn send-report! [report ws] - (ws/send-json! {:op :report :report report} ws)) + (ws/send-json! nil {:op :report :report report} ws)) ;; ------- ;; Reporter @@ -83,8 +83,8 @@ (if (= token api-key) (do (add-websocket-listener! ws-id channel) (send-report! (store->report @rs/store-conn) channel)) - (ws/send-json! {:op :error - :message "Invalid token"} + (ws/send-json! nil {:op :error + :message "Invalid token"} channel)))) :on-error (fn [{throwable :error}] (remove-websocket-listener! ws-id) diff --git a/server/src/instant/util/logging_exporter.clj b/server/src/instant/util/logging_exporter.clj index d595b12da..8dd4aa13b 100644 --- a/server/src/instant/util/logging_exporter.clj +++ b/server/src/instant/util/logging_exporter.clj @@ -120,12 +120,14 @@ (def exclude-span? (if (= :prod (config/get-env)) (fn [span] - (let [attrs (.getAttributes span)] - (when-let [op (.get attrs op-attr-key)] - (or (= op ":set-presence") - (= op ":refresh-presence") - (= op ":server-broadcast") - (= op ":client-broadcast"))))) + (let [name (.getName span) + attrs (.getAttributes span)] + (or (= name "ws/send-json!") + (when-let [op (.get attrs op-attr-key)] + (or (= op ":set-presence") + (= op ":refresh-presence") + (= op ":server-broadcast") + (= op ":client-broadcast")))))) (fn [_span] false))) diff --git a/server/test/instant/reactive/session_test.clj b/server/test/instant/reactive/session_test.clj index 317b958b4..92cef5dc3 100644 --- a/server/test/instant/reactive/session_test.clj +++ b/server/test/instant/reactive/session_test.clj @@ -59,7 +59,7 @@ *eph-store-atom* eph-store-atom] (with-redefs [receive-queue/receive-q receive-q eph/room-refresh-ch room-refresh-ch - ws/send-json! (fn [msg fake-ws-conn] + ws/send-json! (fn [_app-id msg fake-ws-conn] (a/>!! fake-ws-conn msg)) rq/instaql-query-reactive!