From 1f616515a3897b3a5db1517e04b4880c073e05ee Mon Sep 17 00:00:00 2001 From: Daniel Woelfel Date: Tue, 14 Jan 2025 12:27:47 -0800 Subject: [PATCH] Trace invalidation from start to finish (#710) --- .../sandbox/react-nextjs/pages/play/color.tsx | 24 ++++- server/flags-config/instant.perms.ts | 87 +--------------- server/flags-config/instant.schema.ts | 7 +- server/flags-config/package.json | 4 +- server/flags-config/pnpm-lock.yaml | 98 +++++++++++++++++-- server/refinery/rules.yaml | 1 + server/src/instant/db/transaction.clj | 4 + server/src/instant/flags.clj | 19 +++- server/src/instant/lib/ring/websocket.clj | 94 +++++++++--------- server/src/instant/reactive/invalidator.clj | 22 +++-- server/src/instant/reactive/session.clj | 48 ++++++--- server/src/instant/util/delay.clj | 9 +- server/src/instant/util/e2e_tracer.clj | 74 ++++++++++++++ server/src/instant/util/logging_exporter.clj | 57 ++++++----- server/src/instant/util/tracer.clj | 12 +-- 15 files changed, 354 insertions(+), 206 deletions(-) create mode 100644 server/src/instant/util/e2e_tracer.clj diff --git a/client/sandbox/react-nextjs/pages/play/color.tsx b/client/sandbox/react-nextjs/pages/play/color.tsx index 634803329..852adcb6c 100644 --- a/client/sandbox/react-nextjs/pages/play/color.tsx +++ b/client/sandbox/react-nextjs/pages/play/color.tsx @@ -5,7 +5,7 @@ import config from "../../config"; const schema = i.schema({ entities: { colors: i.entity({ color: i.string() }), - } + }, }); const db = init({ ...config, schema }); @@ -16,6 +16,16 @@ function App() { const selectId = "4d39508b-9ee2-48a3-b70d-8192d9c5a059"; +const colorOptions = ["green", "blue", "purple"]; + +function nextColor(c: string): string { + const i = colorOptions.indexOf(c); + if (i === -1) { + return colorOptions[0]; + } + return colorOptions[(i + 1) % colorOptions.length]; +} + function Main() { useEffect(() => { (async () => { @@ -35,7 +45,7 @@ function Main() {

Hi! pick your favorite color

- {["green", "blue", "purple"].map((c) => { + {colorOptions.map((c) => { return ( ); })} +
diff --git a/server/flags-config/instant.perms.ts b/server/flags-config/instant.perms.ts index 3a1b47e26..1bc6f8709 100644 --- a/server/flags-config/instant.perms.ts +++ b/server/flags-config/instant.perms.ts @@ -6,92 +6,9 @@ const rules = { create: "false", }, }, - "rate-limited-apps": { + $default: { allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "storage-whitelist": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "friend-emails": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "view-checks": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "power-user-emails": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "test-emails": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "promo-emails": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "use-patch-presence": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "drop-refresh-spam": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - custodian: { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", - }, - }, - "team-emails": { - allow: { - view: "false", - create: "false", - delete: "false", - update: "false", + $default: "false", }, }, }; diff --git a/server/flags-config/instant.schema.ts b/server/flags-config/instant.schema.ts index 860e5bedc..5740c39fe 100644 --- a/server/flags-config/instant.schema.ts +++ b/server/flags-config/instant.schema.ts @@ -8,6 +8,9 @@ const graph = i.graph( $users: i.entity({ email: i.string().unique().indexed(), }), + "e2e-logging": i.entity({ + "invalidator-rate": i.number(), + }), "drop-refresh-spam": i.entity({ "default-value": i.boolean(), "disabled-apps": i.any(), @@ -29,7 +32,7 @@ const graph = i.graph( email: i.string(), }), "rate-limited-apps": i.entity({ - appId: i.string().unique() + appId: i.string().unique(), }), "storage-whitelist": i.entity({ appId: i.string().unique().indexed(), @@ -47,7 +50,7 @@ const graph = i.graph( // For example, if `posts` should have many `comments`. // More in the docs: // https://www.instantdb.com/docs/schema#defining-links - {}, + {} ); export default graph; diff --git a/server/flags-config/package.json b/server/flags-config/package.json index 689565a74..22bebe1d9 100644 --- a/server/flags-config/package.json +++ b/server/flags-config/package.json @@ -10,7 +10,7 @@ "dev:push": "INSTANT_CLI_DEV=1 instant-cli push" }, "dependencies": { - "@instantdb/core": "^0.14.29", - "instant-cli": "^0.14.29" + "@instantdb/core": "^0.17.7", + "instant-cli": "^0.17.7" } } diff --git a/server/flags-config/pnpm-lock.yaml b/server/flags-config/pnpm-lock.yaml index 76e5a9ece..b0eba208c 100644 --- a/server/flags-config/pnpm-lock.yaml +++ b/server/flags-config/pnpm-lock.yaml @@ -6,11 +6,11 @@ settings: dependencies: '@instantdb/core': - specifier: ^0.14.29 - version: 0.14.29 + specifier: ^0.17.7 + version: 0.17.7 instant-cli: - specifier: ^0.14.29 - version: 0.14.29 + specifier: ^0.17.7 + version: 0.17.7 packages: @@ -234,6 +234,12 @@ packages: dev: false optional: true + /@ewoudenberg/difflib@0.1.0: + resolution: {integrity: sha512-OU5P5mJyD3OoWYMWY+yIgwvgNS9cFAU10f+DDuvtogcWQOoJIsQ4Hy2McSfUfhKjq8L0FuWVb4Rt7kgA+XK86A==} + dependencies: + heap: 0.2.7 + dev: false + /@inquirer/checkbox@2.5.0: resolution: {integrity: sha512-sMgdETOfi2dUHT8r7TT1BTKOwNvdDGFDXYWtQ2J69SvlYNntk9I/gJe7r5yvMwwsuKnYbuRs3pNhx4tgNck5aA==} engines: {node: '>=18'} @@ -253,6 +259,25 @@ packages: '@inquirer/type': 1.5.5 dev: false + /@inquirer/core@9.0.10: + resolution: {integrity: sha512-TdESOKSVwf6+YWDz8GhS6nKscwzkIyakEzCLJ5Vh6O3Co2ClhCJ0A4MG909MUWfaWdpJm7DE45ii51/2Kat9tA==} + engines: {node: '>=18'} + dependencies: + '@inquirer/figures': 1.0.7 + '@inquirer/type': 1.5.5 + '@types/mute-stream': 0.0.4 + '@types/node': 22.7.6 + '@types/wrap-ansi': 3.0.0 + ansi-escapes: 4.3.2 + cli-spinners: 2.9.2 + cli-width: 4.1.0 + mute-stream: 1.0.0 + signal-exit: 4.1.0 + strip-ansi: 6.0.1 + wrap-ansi: 6.2.0 + yoctocolors-cjs: 2.1.2 + dev: false + /@inquirer/core@9.2.1: resolution: {integrity: sha512-F2VBt7W/mwqEU4bL0RnHNZmC/OxzNx9cOYxHqnXX3MP6ruYvZUZAW9imgN9+h/uBT/oP8Gh888J2OZSbjSeWcg==} engines: {node: '>=18'} @@ -319,6 +344,22 @@ packages: ansi-escapes: 4.3.2 dev: false + /@inquirer/prompts@5.3.8: + resolution: {integrity: sha512-b2BudQY/Si4Y2a0PdZZL6BeJtl8llgeZa7U2j47aaJSCeAl1e4UI7y8a9bSkO3o/ZbZrgT5muy/34JbsjfIWxA==} + engines: {node: '>=18'} + dependencies: + '@inquirer/checkbox': 2.5.0 + '@inquirer/confirm': 3.2.0 + '@inquirer/editor': 2.2.0 + '@inquirer/expand': 2.3.0 + '@inquirer/input': 2.3.0 + '@inquirer/number': 1.1.0 + '@inquirer/password': 2.2.0 + '@inquirer/rawlist': 2.3.0 + '@inquirer/search': 1.1.0 + '@inquirer/select': 2.5.0 + dev: false + /@inquirer/prompts@5.5.0: resolution: {integrity: sha512-BHDeL0catgHdcHbSFFUddNzvx/imzJMft+tWDPwTm3hfu8/tApk1HrooNngB2Mb4qY+KaRWF+iZqoVUPeslEog==} engines: {node: '>=18'} @@ -379,8 +420,8 @@ packages: mute-stream: 1.0.0 dev: false - /@instantdb/core@0.14.29: - resolution: {integrity: sha512-GyWnNa07QJXy5pdR9lr/xdfoDHJJVe2WYyLHAXeZhn9Fiik6ErxyKmwjvcT8c07jQilwdicR7dbJ9SMh1iQR9g==} + /@instantdb/core@0.17.7: + resolution: {integrity: sha512-5JaBEryf9MFCwoRFX5vNu850n93nbbW+urPc85hpUFZR4M4pU9f7d1qxnvlrDreqNobdzN8VvP+8Yv9uFT7/4Q==} dependencies: mutative: 1.0.11 uuid: 9.0.1 @@ -487,6 +528,11 @@ packages: resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} dev: false + /colors@1.4.0: + resolution: {integrity: sha512-a+UqTh4kgZg/SlGvfbzDHpgRu7AAQOmmqRHJnxhRZICKFUT91brVhNNt58CMWU9PsBbv3PDCZUHbVxuDiH2mtA==} + engines: {node: '>=0.1.90'} + dev: false + /commander@12.1.0: resolution: {integrity: sha512-Vw8qHK3bZM9y/P10u3Vib8o/DdkvA2OtPtZvD871QKjy74Wj1WSKFILMPRPSdUSx5RFK1arlJzEtA4PkFgnbuA==} engines: {node: '>=18'} @@ -531,6 +577,13 @@ packages: engines: {node: '>=12'} dev: false + /dreamopt@0.8.0: + resolution: {integrity: sha512-vyJTp8+mC+G+5dfgsY+r3ckxlz+QMX40VjPQsZc5gxVAxLmi64TBoVkP54A/pRAXMXsbu2GMMBrZPxNv23waMg==} + engines: {node: '>=0.4.0'} + dependencies: + wordwrap: 1.0.0 + dev: false + /emoji-regex@10.4.0: resolution: {integrity: sha512-EC+0oUMY1Rqm4O6LLrgjtYDvcVYTy7chDnM4Q7030tP4Kwj3u/pR6gP9ygnp2CJMK5Gq+9Q2oqmrFJAz01DXjw==} dev: false @@ -614,6 +667,10 @@ packages: engines: {node: '>=8'} dev: false + /heap@0.2.7: + resolution: {integrity: sha512-2bsegYkkHO+h/9MGbn6KWcE45cHZgPANo5LXF7EvWdT0yT2EguSVO1nDgU5c8+ZOPwp2vMNa7YFsJhVcDR9Sdg==} + dev: false + /iconv-lite@0.4.24: resolution: {integrity: sha512-v3MXnZAcvnywkTUEZomIActle7RXXeedOR31wwl7VlyoXO4Qi9arvSenNQWne1TcRwhCL1HwLI21bEqdpj8/rA==} engines: {node: '>=0.10.0'} @@ -649,19 +706,23 @@ packages: rxjs: 7.8.1 dev: false - /instant-cli@0.14.29: - resolution: {integrity: sha512-GnYov/Q8jUjFN96rMd91Eg6ucxGd6jQCJnM/MqbeEOPu1KsCWNw5jc+fwfTplyYGlUSGnrbY5SYiJ8EuubwgGQ==} + /instant-cli@0.17.7: + resolution: {integrity: sha512-USNFUD4tK49dqqjBUpIJObyBsYad6ElMUr9V0pZA+QIM8kwbo2By7MJwLXLLWSxpgy0NOXeAlZtPQI77DRB/3A==} hasBin: true dependencies: - '@inquirer/prompts': 5.5.0 + '@inquirer/core': 9.0.10 + '@inquirer/prompts': 5.3.8 + ansi-escapes: 4.3.2 chalk: 5.3.0 commander: 12.1.0 dotenv: 16.4.5 env-paths: 3.0.0 inquirer: 10.2.2 + json-diff: 1.0.6 open: 10.1.0 ora: 8.1.1 pkg-dir: 8.0.0 + prettier: 3.4.2 terminal-link: 3.0.0 unconfig: 0.5.5 transitivePeerDependencies: @@ -719,6 +780,15 @@ packages: hasBin: true dev: false + /json-diff@1.0.6: + resolution: {integrity: sha512-tcFIPRdlc35YkYdGxcamJjllUhXWv4n2rK9oJ2RsAzV4FBkuV4ojKEDgcZ+kpKxDmJKv+PFK65+1tVVOnSeEqA==} + hasBin: true + dependencies: + '@ewoudenberg/difflib': 0.1.0 + colors: 1.4.0 + dreamopt: 0.8.0 + dev: false + /load-tsconfig@0.2.5: resolution: {integrity: sha512-IXO6OCs9yg8tMKzfPZ1YmheJbZCiEsnBdcB03l0OcfK9prKnJb96siuHCr5Fl37/yo9DnKU+TLpxzTUspw9shg==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -799,6 +869,12 @@ packages: find-up-simple: 1.0.0 dev: false + /prettier@3.4.2: + resolution: {integrity: sha512-e9MewbtFo+Fevyuxn/4rrcDAaq0IYxPGLvObpQjiZBMAzB9IGmzlnG9RZy3FFas+eBMu2vA0CszMeduow5dIuQ==} + engines: {node: '>=14'} + hasBin: true + dev: false + /resolve-pkg-maps@1.0.0: resolution: {integrity: sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw==} dev: false @@ -947,6 +1023,10 @@ packages: hasBin: true dev: false + /wordwrap@1.0.0: + resolution: {integrity: sha512-gvVzJFlPycKc5dZN4yPkP8w7Dc37BtP1yczEneOb4uq34pXZcvrtRTmWV8W+Ume+XCxKgbjM+nevkyFPMybd4Q==} + dev: false + /wrap-ansi@6.2.0: resolution: {integrity: sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA==} engines: {node: '>=8'} diff --git a/server/refinery/rules.yaml b/server/refinery/rules.yaml index 7c4354fd0..fda94b2c4 100644 --- a/server/refinery/rules.yaml +++ b/server/refinery/rules.yaml @@ -20,3 +20,4 @@ Samplers: FieldList: - root.name - root.op + - root.entropy diff --git a/server/src/instant/db/transaction.clj b/server/src/instant/db/transaction.clj index 1f5123415..191cd564e 100644 --- a/server/src/instant/db/transaction.clj +++ b/server/src/instant/db/transaction.clj @@ -10,6 +10,7 @@ [instant.system-catalog :refer [system-catalog-app-id]] [instant.util.coll :as coll] [instant.util.exception :as ex] + [instant.util.e2e-tracer :as e2e-tracer] [instant.util.tracer :as tracer] [next.jdbc :as next-jdbc])) @@ -206,6 +207,9 @@ results-with-on-deletes (enforce-on-deletes conn attrs app-id results) tx (transaction-model/create! conn {:app-id app-id})] + (e2e-tracer/start-invalidator-tracking! {:tx-id (:id tx)}) + (e2e-tracer/invalidator-tracking-step! {:tx-id (:id tx) + :name "transact"}) (assoc tx :results results-with-on-deletes))))) (defn transact! diff --git a/server/src/instant/flags.clj b/server/src/instant/flags.clj index 2aad8382f..dc66df3f7 100644 --- a/server/src/instant/flags.clj +++ b/server/src/instant/flags.clj @@ -15,7 +15,8 @@ :use-patch-presence {} :drop-refresh-spam {} :promo-emails {} - :rate-limited-apps {}}) + :rate-limited-apps {} + :e2e-logging {}}) (defn transform-query-result "Function that is called on the query result before it is stored in the @@ -79,13 +80,19 @@ rate-limited-apps (reduce (fn [acc {:strs [appId]}] (conj acc (parse-uuid appId))) #{} - (get result "rate-limited-apps"))] + (get result "rate-limited-apps")) + e2e-logging (when-let [flag (-> (get result "e2e-logging") + first)] + {:invalidator-every-n (try (/ 1 (get flag "invalidator-rate")) + (catch Exception _e + 10000))})] {:emails emails :storage-enabled-whitelist storage-enabled-whitelist :use-patch-presence use-patch-presence :promo-code-emails promo-code-emails :drop-refresh-spam drop-refresh-spam - :rate-limited-apps rate-limited-apps})) + :rate-limited-apps rate-limited-apps + :e2e-logging e2e-logging})) (def queries [{:query query :transform #'transform-query-result}]) @@ -148,3 +155,9 @@ (defn app-rate-limited? [app-id] (contains? (:rate-limited-apps (query-result)) app-id)) + +(defn e2e-should-honeycomb-publish? [^Long tx-id] + (and tx-id + (zero? (mod tx-id (or (get-in (query-result) + [:e2e-logging :invalidator-every-n]) + 10000))))) diff --git a/server/src/instant/lib/ring/websocket.clj b/server/src/instant/lib/ring/websocket.clj index c477bc687..38bb42b2c 100644 --- a/server/src/instant/lib/ring/websocket.clj +++ b/server/src/instant/lib/ring/websocket.clj @@ -10,6 +10,7 @@ (:refer-clojure :exclude [send]) (:require [ring.adapter.undertow.headers :refer [set-headers]] [instant.util.json :refer [->json]] + [instant.util.e2e-tracer :as e2e-tracer] [instant.util.tracer :as tracer] [instant.util.delay :as delay]) (:import @@ -30,6 +31,7 @@ [ring.adapter.undertow Util] [clojure.lang IPersistentMap] [io.undertow.websockets.extensions PerMessageDeflateHandshake] + [java.util.concurrent ScheduledFuture] [java.util.concurrent.locks ReentrantLock] [java.util.concurrent.atomic AtomicLong] [java.io IOException] @@ -43,7 +45,9 @@ See `ws-callback` for more details." [{:keys [on-message on-close-message on-error channel-wrapper - atomic-last-received-at atomic-last-ping-at set-ping-latency-nanos]}] + ^AtomicLong atomic-last-received-at + ^AtomicLong atomic-last-ping-at + set-ping-latency-nanos]}] (let [on-message (or on-message (constantly nil)) on-error (or on-error (constantly nil)) on-close-message (or on-close-message (constantly nil))] @@ -76,7 +80,7 @@ (defn try-send-ping-blocking "Tries to send a ping-message. Ignores closed channel exceptions." - [channel] + [^WebSocketChannel channel] (try (WebSockets/sendPingBlocking (ByteBuffer/allocate 0) @@ -158,25 +162,25 @@ (reify WebSocketConnectionCallback (^void onConnect [_ ^WebSocketHttpExchange exchange ^WebSocketChannel channel] - (let [ping-job (delay/repeat-fn - ping-pool - ping-interval-ms - (fn [] - (straight-jacket-run-ping-job channel - atomic-last-received-at - atomic-last-ping-at - idle-timeout-ms))) - - close-task (reify ChannelListener - (handleEvent [_this channel] - (.cancel ping-job false) - (on-close (channel-wrapper channel))))] - (.set atomic-last-received-at (System/currentTimeMillis)) - (on-open {:exchange exchange - :channel (channel-wrapper channel)}) - (.addCloseTask channel close-task) - (.set (.getReceiveSetter channel) listener) - (.resumeReceives channel)))))) + (let [^ScheduledFuture ping-job (delay/repeat-fn + ping-pool + ping-interval-ms + (fn [] + (straight-jacket-run-ping-job channel + atomic-last-received-at + atomic-last-ping-at + idle-timeout-ms))) + + close-task (reify ChannelListener + (handleEvent [_this channel] + (.cancel ping-job false) + (on-close (channel-wrapper channel))))] + (.set atomic-last-received-at (System/currentTimeMillis)) + (on-open {:exchange exchange + :channel (channel-wrapper channel)}) + (.addCloseTask channel close-task) + (.set (.getReceiveSetter channel) listener) + (.resumeReceives channel)))))) (defn ws-request [^HttpServerExchange exchange ^IPersistentMap headers ^WebSocketConnectionCallback callback] (let [handler (-> (WebSocketProtocolHandshakeHandler. callback) @@ -187,32 +191,34 @@ (defn send-json! "Serializes `obj` to json, and sends over a websocket." - [app-id obj {:keys [websocket-stub undertow-websocket send-lock]}] + [app-id obj {:keys [websocket-stub undertow-websocket ^ReentrantLock send-lock]}] ;; Websockets/sendText _should_ be thread-safe ;; But, t becomes thread-unsafe when we use per-message-deflate ;; Using a `send-lock` to make `send-json!` thread-safe (if websocket-stub (websocket-stub obj) (let [obj-json (->json obj) - p (promise) - _ (try - (tracer/with-span! {:name "ws/send-json!" - :attributes {:size (count obj-json) - :app-id app-id - :send-lock.queue-length (.getQueueLength send-lock) - :send-lock.is-locked (.isLocked send-lock) - :send-lock.held-by-current-thread (.isHeldByCurrentThread send-lock)}} - (.lock send-lock) - (WebSockets/sendText - ^String obj-json - ^WebSocketChannel undertow-websocket - (proxy [WebSocketCallback] [] - (complete [ws-conn context] - (deliver p nil)) - (onError [ws-conn context throwable] - (deliver p throwable))))) - (finally - (.unlock send-lock))) - ret @p] - (when (instance? Throwable ret) - (throw ret))))) + p (promise)] + (tracer/with-span! {:name "ws/send-json!" + :attributes {:app-id app-id + :size (count obj-json)}} + (try + (.lock send-lock) + (WebSockets/sendText + ^String obj-json + ^WebSocketChannel undertow-websocket + (proxy [WebSocketCallback] [] + (complete [ws-conn context] + (deliver p nil)) + (onError [ws-conn context throwable] + (deliver p throwable)))) + (finally + (.unlock send-lock))) + (let [ret @p] + (when-let [tx-id (-> obj meta :tx-id)] + (e2e-tracer/invalidator-tracking-step! + {:tx-id tx-id + :name "send-json-delivered" + :attributes {:session-id (-> obj meta :session-id)}})) + (when (instance? Throwable ret) + (throw ret))))))) diff --git a/server/src/instant/reactive/invalidator.clj b/server/src/instant/reactive/invalidator.clj index 7c1ea2f2f..9cc923316 100644 --- a/server/src/instant/reactive/invalidator.clj +++ b/server/src/instant/reactive/invalidator.clj @@ -14,6 +14,7 @@ [instant.reactive.store :as rs] [instant.util.async :as ua] [instant.util.json :refer [<-json]] + [instant.util.e2e-tracer :as e2e-tracer] [instant.util.tracer :as tracer] [instant.db.model.triple :as triple-model]) (:import @@ -272,12 +273,15 @@ (instant-user-model/evict-user-id-from-cache id))) (when (and some-changes app-id) - {:attr-changes attrs - :ident-changes idents - :triple-changes triples - :app-id app-id - :tx-created-at (extract-tx-created-at transactions-change) - :tx-id (extract-tx-id transactions-change)}))) + (let [tx-id (extract-tx-id transactions-change)] + (e2e-tracer/invalidator-tracking-step! {:tx-id tx-id + :name "transform-wal-record"}) + {:attr-changes attrs + :ident-changes idents + :triple-changes triples + :app-id app-id + :tx-created-at (extract-tx-created-at transactions-change) + :tx-id (extract-tx-id transactions-change)})))) (defn wal-record-xf "Filters wal records for supported changes. Returns [app-id changes]" @@ -319,10 +323,14 @@ (try (let [sockets (invalidate! store-conn wal-record)] (tracer/add-data! {:attributes {:num-sockets (count sockets)}}) + (e2e-tracer/invalidator-tracking-step! {:tx-id tx-id + :name "send-refreshes" + :attributes {:num-sockets (count sockets)}}) (tracer/with-span! {:name "invalidator/send-refreshes"} (doseq [{:keys [id]} sockets] (receive-queue/enqueue->receive-q {:op :refresh - :session-id id})))) + :session-id id + :tx-id tx-id})))) (catch Throwable t (def -wal-record wal-record) (def -store-value @store-conn) diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index 8f3e1061a..5fb5f5d7c 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -31,6 +31,7 @@ [instant.util.exception :as ex] [instant.util.json :refer [<-json]] [instant.util.semver :as semver] + [instant.util.e2e-tracer :as e2e-tracer] [instant.util.tracer :as tracer] [instant.util.uuid :as uuid-util] [lambdaisland.uri :as uri]) @@ -167,7 +168,10 @@ :instaql-result instaql-result :result-changed? result-changed?})) -(defn- handle-refresh! [store-conn sess-id _event debug-info] +(defn- handle-refresh! [store-conn sess-id event debug-info] + (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id event) + :name "start-refresh" + :attributes {:session-id sess-id}}) (let [auth (get-auth! store-conn sess-id) app-id (-> auth :app :id) current-user (-> auth :user) @@ -193,17 +197,25 @@ drop-spam? (flags/drop-refresh-spam? app-id) computations (if drop-spam? computations - recompute-results)] + recompute-results) + tracer-attrs {:num-recomputations num-recomputations + :num-spam num-spam + :num-computations num-computations + :dropped-spam? drop-spam?}] + (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id event) + :name "finish-refresh-queries" + :attributes (assoc tracer-attrs + :session-id sess-id)}) (tracer/with-span! {:name "handle-refresh/send-event!" - :attributes {:num-recomputations num-recomputations - :num-spam num-spam - :num-computations num-computations - :dropped-spam? drop-spam?}} + :attributes tracer-attrs} (when (seq computations) - (rs/send-event! store-conn app-id sess-id {:op :refresh-ok - :processed-tx-id processed-tx-id - :attrs attrs - :computations computations}))))) + (rs/send-event! store-conn app-id sess-id (with-meta + {:op :refresh-ok + :processed-tx-id processed-tx-id + :attrs attrs + :computations computations} + {:tx-id (:tx-id event) + :session-id sess-id})))))) ;; ----- ;; transact @@ -566,17 +578,21 @@ (#(%)))) (assoc metadata :skipped-size skipped-size))))) -(defmulti consolidate - (fn [type batch] - (if (= 1 (count batch)) - :default - type))) +(defn resolve-consolidate [type batch] + (if (= 1 (count batch)) + :default + type)) + +(defmulti consolidate #'resolve-consolidate) (defmethod consolidate :default [_ batch] batch) (defmethod consolidate :refresh [_ batch] - [(-> (first batch) + (doseq [{:keys [item]} (drop-last batch)] + (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id item) + :name "skipped-refresh"})) + [(-> (last batch) (assoc :skipped-size (dec (count batch))))]) (defmethod consolidate :refresh-presence [_ batch] diff --git a/server/src/instant/util/delay.clj b/server/src/instant/util/delay.clj index 0712218f3..19dcbc1eb 100644 --- a/server/src/instant/util/delay.clj +++ b/server/src/instant/util/delay.clj @@ -1,5 +1,6 @@ (ns instant.util.delay - (:import [java.util.concurrent ScheduledThreadPoolExecutor TimeUnit])) + (:import + (java.util.concurrent Callable ScheduledThreadPoolExecutor TimeUnit))) (defn cpu-count [] (.availableProcessors (Runtime/getRuntime))) @@ -8,11 +9,11 @@ :or {thread-count (+ 2 (cpu-count))}}] (ScheduledThreadPoolExecutor. thread-count)) -(defn delay-fn [thread-pool delay-ms f] +(defn delay-fn [^ScheduledThreadPoolExecutor thread-pool ^Long delay-ms ^Callable f] (.schedule thread-pool f delay-ms TimeUnit/MILLISECONDS)) -(defn repeat-fn [thread-pool delay-ms f] +(defn repeat-fn [^ScheduledThreadPoolExecutor thread-pool delay-ms f] (.scheduleAtFixedRate thread-pool f delay-ms delay-ms TimeUnit/MILLISECONDS)) -(defn shutdown-pool! [pool] +(defn shutdown-pool! [^ScheduledThreadPoolExecutor pool] (.shutdown pool)) diff --git a/server/src/instant/util/e2e_tracer.clj b/server/src/instant/util/e2e_tracer.clj new file mode 100644 index 000000000..57c8f2a57 --- /dev/null +++ b/server/src/instant/util/e2e_tracer.clj @@ -0,0 +1,74 @@ +(ns instant.util.e2e-tracer + (:require [instant.util.tracer :as tracer] + [instant.flags :as flags]) + (:import + (io.opentelemetry.api.trace SpanContext) + (io.opentelemetry.sdk.trace SdkSpan) + (java.lang.reflect Field) + (java.nio ByteBuffer) + (org.apache.commons.codec.binary Hex))) + +;; Starts the trace-id with a1 so that it's easy to spot +(def tx-id-magic-prefix ^byte (byte -95)) + +(defn tx-id->trace-id + "Creates a stable trace id for a given tx-id so that we can attach + a series of spans executing on different machines to the same parent." + [^Long tx-id] + (-> (ByteBuffer/allocate 16) ;; 16 bytes for the traceid + (.put ^byte tx-id-magic-prefix) + (.putLong tx-id) + (.array) + (Hex/encodeHexString))) + +(defn tx-id->span-id + "Creates a stable span id for a given tx-id so that we can attach + a series of spans executing on different machines to the same parent." + [^Long tx-id] + (-> (ByteBuffer/allocate 8) ;; 16 bytes for the spanId + (.putLong tx-id) + (.array) + (Hex/encodeHexString))) + +(defn get-field + "Gets a private field from a class instance using reflection." + [^Class cls ^String field-name] + (doto (.getDeclaredField cls field-name) + (.setAccessible true))) + +(def context-field ^Field (get-field SdkSpan "context")) + +(defn make-invalidator-tracking-span [^Long tx-id attrs] + (let [span (binding [tracer/*span* nil] ;; make sure this is a top-level span + (tracer/new-span! {:name "e2e/invalidator/tracking-span" + :attributes (merge {:tx-id tx-id} + attrs)})) + context (.getSpanContext ^SdkSpan span) + modified-context (SpanContext/create (tx-id->trace-id tx-id) + (tx-id->span-id tx-id) + (.getTraceFlags context) + (.getTraceState context))] + (.set ^Field context-field span modified-context) + span)) + +(defn start-invalidator-tracking! [{:keys [^Long tx-id app-id]}] + (when (flags/e2e-should-honeycomb-publish? tx-id) + (let [span (make-invalidator-tracking-span tx-id {:app-id app-id + ;; encourage honeycomb not + ;; to skip this span + :entropy tx-id})] + (tracer/end-span! span)))) + +(defn invalidator-tracking-step! [{:keys [^Long tx-id] :as span-opts}] + ;; Create a new span with a stable trace-id and span-id for the parent + (when (flags/e2e-should-honeycomb-publish? tx-id) + (binding [tracer/*span* (make-invalidator-tracking-span tx-id nil)] + (tracer/record-info! + (-> span-opts + (update :name (fn [s] (format "e2e/invalidator/%s" s))) + (update :attributes (fn [a] + (merge a + {:tx-id tx-id + ;; encourage honeycomb not + ;; to skip this span + :entropy tx-id})))))))) diff --git a/server/src/instant/util/logging_exporter.clj b/server/src/instant/util/logging_exporter.clj index 25493d431..382b658c9 100644 --- a/server/src/instant/util/logging_exporter.clj +++ b/server/src/instant/util/logging_exporter.clj @@ -5,6 +5,7 @@ [clojure.string :as string] [instant.config :as config]) (:import (io.opentelemetry.api.common AttributeKey) + (io.opentelemetry.api.trace Span) (io.opentelemetry.sdk.common CompletableResultCode) (io.opentelemetry.sdk.trace.export SpanExporter) (java.util.concurrent TimeUnit) @@ -120,33 +121,37 @@ (def exclude-span? (if (= :prod (config/get-env)) + (fn [^Span span] + (let [n (.getName span)] + (case n + ("gc" + "ws/send-json!" + "handle-refresh/send-event!" + "store/record-datalog-query-finish!" + "store/record-datalog-query-start!" + "store/swap-datalog-cache!" + "store/bump-instaql-version!" + "store/add-instaql-query!" + "instaql/get-eid-check-result!" + "extract-permission-helpers" + "instaql/map-permissioned-node") true + + ("receive-worker/handle-event" + "receive-worker/handle-receive") + (case (-> (.getAttributes span) + (.get op-attr-key)) + (":set-presence" + ":refresh-presence" + ":server-broadcast" + ":client-broadcast") true + + false) + + (string/starts-with? n "e2e")))) (fn [span] - (case (.getName span) - ("ws/send-json!" - "handle-refresh/send-event!" - "store/record-datalog-query-finish!" - "store/record-datalog-query-start!" - "store/swap-datalog-cache!" - "store/bump-instaql-version!" - "store/add-instaql-query!" - "instaql/get-eid-check-result!" - "extract-permission-helpers" - "instaql/map-permissioned-node") true - - ("receive-worker/handle-event" - "receive-worker/handle-receive") - (case (-> (.getAttributes span) - (.get op-attr-key)) - (":set-presence" - ":refresh-presence" - ":server-broadcast" - ":client-broadcast") true - - false) - - false)) - (fn [span] - (= (.getName span) "gc")))) + (let [n (.getName span)] + (or (= n "gc") + (string/starts-with? n "e2e")))))) (defn log-spans [spans] (doseq [span spans diff --git a/server/src/instant/util/tracer.clj b/server/src/instant/util/tracer.clj index 4fe4881e4..42d4f868e 100644 --- a/server/src/instant/util/tracer.clj +++ b/server/src/instant/util/tracer.clj @@ -1,20 +1,20 @@ (ns instant.util.tracer "Span lib for integrating with Honeycomb" + (:gen-class) (:require [clojure.main :as main] - [instant.util.logging-exporter :as logging-exporter] [instant.config :as config] + [instant.util.logging-exporter :as logging-exporter] [steffan-westcott.clj-otel.api.attributes :as attr]) (:import - (io.opentelemetry.sdk OpenTelemetrySdk) - (io.opentelemetry.api.common Attributes AttributeKey) + (io.opentelemetry.api.common AttributeKey Attributes) (io.opentelemetry.api.trace Span StatusCode) (io.opentelemetry.context Context) (io.opentelemetry.exporter.otlp.trace OtlpGrpcSpanExporter) + (io.opentelemetry.sdk OpenTelemetrySdk) (io.opentelemetry.sdk.resources Resource) - (io.opentelemetry.sdk.trace SdkTracerProvider SdkTracer) - (io.opentelemetry.sdk.trace.export BatchSpanProcessor SimpleSpanProcessor)) - (:gen-class)) + (io.opentelemetry.sdk.trace SdkTracer SdkTracerProvider) + (io.opentelemetry.sdk.trace.export BatchSpanProcessor SimpleSpanProcessor))) (def ^:dynamic *span* nil)