diff --git a/client/sandbox/react-nextjs/pages/play/color.tsx b/client/sandbox/react-nextjs/pages/play/color.tsx
index 634803329..852adcb6c 100644
--- a/client/sandbox/react-nextjs/pages/play/color.tsx
+++ b/client/sandbox/react-nextjs/pages/play/color.tsx
@@ -5,7 +5,7 @@ import config from "../../config";
const schema = i.schema({
entities: {
colors: i.entity({ color: i.string() }),
- }
+ },
});
const db = init({ ...config, schema });
@@ -16,6 +16,16 @@ function App() {
const selectId = "4d39508b-9ee2-48a3-b70d-8192d9c5a059";
+const colorOptions = ["green", "blue", "purple"];
+
+function nextColor(c: string): string {
+ const i = colorOptions.indexOf(c);
+ if (i === -1) {
+ return colorOptions[0];
+ }
+ return colorOptions[(i + 1) % colorOptions.length];
+}
+
function Main() {
useEffect(() => {
(async () => {
@@ -35,7 +45,7 @@ function Main() {
Hi! pick your favorite color
- {["green", "blue", "purple"].map((c) => {
+ {colorOptions.map((c) => {
return (
);
})}
+
diff --git a/server/flags-config/instant.perms.ts b/server/flags-config/instant.perms.ts
index 3a1b47e26..1bc6f8709 100644
--- a/server/flags-config/instant.perms.ts
+++ b/server/flags-config/instant.perms.ts
@@ -6,92 +6,9 @@ const rules = {
create: "false",
},
},
- "rate-limited-apps": {
+ $default: {
allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "storage-whitelist": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "friend-emails": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "view-checks": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "power-user-emails": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "test-emails": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "promo-emails": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "use-patch-presence": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "drop-refresh-spam": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- custodian: {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
- },
- },
- "team-emails": {
- allow: {
- view: "false",
- create: "false",
- delete: "false",
- update: "false",
+ $default: "false",
},
},
};
diff --git a/server/flags-config/instant.schema.ts b/server/flags-config/instant.schema.ts
index 860e5bedc..5740c39fe 100644
--- a/server/flags-config/instant.schema.ts
+++ b/server/flags-config/instant.schema.ts
@@ -8,6 +8,9 @@ const graph = i.graph(
$users: i.entity({
email: i.string().unique().indexed(),
}),
+ "e2e-logging": i.entity({
+ "invalidator-rate": i.number(),
+ }),
"drop-refresh-spam": i.entity({
"default-value": i.boolean(),
"disabled-apps": i.any(),
@@ -29,7 +32,7 @@ const graph = i.graph(
email: i.string(),
}),
"rate-limited-apps": i.entity({
- appId: i.string().unique()
+ appId: i.string().unique(),
}),
"storage-whitelist": i.entity({
appId: i.string().unique().indexed(),
@@ -47,7 +50,7 @@ const graph = i.graph(
// For example, if `posts` should have many `comments`.
// More in the docs:
// https://www.instantdb.com/docs/schema#defining-links
- {},
+ {}
);
export default graph;
diff --git a/server/flags-config/package.json b/server/flags-config/package.json
index 689565a74..22bebe1d9 100644
--- a/server/flags-config/package.json
+++ b/server/flags-config/package.json
@@ -10,7 +10,7 @@
"dev:push": "INSTANT_CLI_DEV=1 instant-cli push"
},
"dependencies": {
- "@instantdb/core": "^0.14.29",
- "instant-cli": "^0.14.29"
+ "@instantdb/core": "^0.17.7",
+ "instant-cli": "^0.17.7"
}
}
diff --git a/server/flags-config/pnpm-lock.yaml b/server/flags-config/pnpm-lock.yaml
index 76e5a9ece..b0eba208c 100644
--- a/server/flags-config/pnpm-lock.yaml
+++ b/server/flags-config/pnpm-lock.yaml
@@ -6,11 +6,11 @@ settings:
dependencies:
'@instantdb/core':
- specifier: ^0.14.29
- version: 0.14.29
+ specifier: ^0.17.7
+ version: 0.17.7
instant-cli:
- specifier: ^0.14.29
- version: 0.14.29
+ specifier: ^0.17.7
+ version: 0.17.7
packages:
@@ -234,6 +234,12 @@ packages:
dev: false
optional: true
+ /@ewoudenberg/difflib@0.1.0:
+ resolution: {integrity: sha512-OU5P5mJyD3OoWYMWY+yIgwvgNS9cFAU10f+DDuvtogcWQOoJIsQ4Hy2McSfUfhKjq8L0FuWVb4Rt7kgA+XK86A==}
+ dependencies:
+ heap: 0.2.7
+ dev: false
+
/@inquirer/checkbox@2.5.0:
resolution: {integrity: sha512-sMgdETOfi2dUHT8r7TT1BTKOwNvdDGFDXYWtQ2J69SvlYNntk9I/gJe7r5yvMwwsuKnYbuRs3pNhx4tgNck5aA==}
engines: {node: '>=18'}
@@ -253,6 +259,25 @@ packages:
'@inquirer/type': 1.5.5
dev: false
+ /@inquirer/core@9.0.10:
+ resolution: {integrity: sha512-TdESOKSVwf6+YWDz8GhS6nKscwzkIyakEzCLJ5Vh6O3Co2ClhCJ0A4MG909MUWfaWdpJm7DE45ii51/2Kat9tA==}
+ engines: {node: '>=18'}
+ dependencies:
+ '@inquirer/figures': 1.0.7
+ '@inquirer/type': 1.5.5
+ '@types/mute-stream': 0.0.4
+ '@types/node': 22.7.6
+ '@types/wrap-ansi': 3.0.0
+ ansi-escapes: 4.3.2
+ cli-spinners: 2.9.2
+ cli-width: 4.1.0
+ mute-stream: 1.0.0
+ signal-exit: 4.1.0
+ strip-ansi: 6.0.1
+ wrap-ansi: 6.2.0
+ yoctocolors-cjs: 2.1.2
+ dev: false
+
/@inquirer/core@9.2.1:
resolution: {integrity: sha512-F2VBt7W/mwqEU4bL0RnHNZmC/OxzNx9cOYxHqnXX3MP6ruYvZUZAW9imgN9+h/uBT/oP8Gh888J2OZSbjSeWcg==}
engines: {node: '>=18'}
@@ -319,6 +344,22 @@ packages:
ansi-escapes: 4.3.2
dev: false
+ /@inquirer/prompts@5.3.8:
+ resolution: {integrity: sha512-b2BudQY/Si4Y2a0PdZZL6BeJtl8llgeZa7U2j47aaJSCeAl1e4UI7y8a9bSkO3o/ZbZrgT5muy/34JbsjfIWxA==}
+ engines: {node: '>=18'}
+ dependencies:
+ '@inquirer/checkbox': 2.5.0
+ '@inquirer/confirm': 3.2.0
+ '@inquirer/editor': 2.2.0
+ '@inquirer/expand': 2.3.0
+ '@inquirer/input': 2.3.0
+ '@inquirer/number': 1.1.0
+ '@inquirer/password': 2.2.0
+ '@inquirer/rawlist': 2.3.0
+ '@inquirer/search': 1.1.0
+ '@inquirer/select': 2.5.0
+ dev: false
+
/@inquirer/prompts@5.5.0:
resolution: {integrity: sha512-BHDeL0catgHdcHbSFFUddNzvx/imzJMft+tWDPwTm3hfu8/tApk1HrooNngB2Mb4qY+KaRWF+iZqoVUPeslEog==}
engines: {node: '>=18'}
@@ -379,8 +420,8 @@ packages:
mute-stream: 1.0.0
dev: false
- /@instantdb/core@0.14.29:
- resolution: {integrity: sha512-GyWnNa07QJXy5pdR9lr/xdfoDHJJVe2WYyLHAXeZhn9Fiik6ErxyKmwjvcT8c07jQilwdicR7dbJ9SMh1iQR9g==}
+ /@instantdb/core@0.17.7:
+ resolution: {integrity: sha512-5JaBEryf9MFCwoRFX5vNu850n93nbbW+urPc85hpUFZR4M4pU9f7d1qxnvlrDreqNobdzN8VvP+8Yv9uFT7/4Q==}
dependencies:
mutative: 1.0.11
uuid: 9.0.1
@@ -487,6 +528,11 @@ packages:
resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==}
dev: false
+ /colors@1.4.0:
+ resolution: {integrity: sha512-a+UqTh4kgZg/SlGvfbzDHpgRu7AAQOmmqRHJnxhRZICKFUT91brVhNNt58CMWU9PsBbv3PDCZUHbVxuDiH2mtA==}
+ engines: {node: '>=0.1.90'}
+ dev: false
+
/commander@12.1.0:
resolution: {integrity: sha512-Vw8qHK3bZM9y/P10u3Vib8o/DdkvA2OtPtZvD871QKjy74Wj1WSKFILMPRPSdUSx5RFK1arlJzEtA4PkFgnbuA==}
engines: {node: '>=18'}
@@ -531,6 +577,13 @@ packages:
engines: {node: '>=12'}
dev: false
+ /dreamopt@0.8.0:
+ resolution: {integrity: sha512-vyJTp8+mC+G+5dfgsY+r3ckxlz+QMX40VjPQsZc5gxVAxLmi64TBoVkP54A/pRAXMXsbu2GMMBrZPxNv23waMg==}
+ engines: {node: '>=0.4.0'}
+ dependencies:
+ wordwrap: 1.0.0
+ dev: false
+
/emoji-regex@10.4.0:
resolution: {integrity: sha512-EC+0oUMY1Rqm4O6LLrgjtYDvcVYTy7chDnM4Q7030tP4Kwj3u/pR6gP9ygnp2CJMK5Gq+9Q2oqmrFJAz01DXjw==}
dev: false
@@ -614,6 +667,10 @@ packages:
engines: {node: '>=8'}
dev: false
+ /heap@0.2.7:
+ resolution: {integrity: sha512-2bsegYkkHO+h/9MGbn6KWcE45cHZgPANo5LXF7EvWdT0yT2EguSVO1nDgU5c8+ZOPwp2vMNa7YFsJhVcDR9Sdg==}
+ dev: false
+
/iconv-lite@0.4.24:
resolution: {integrity: sha512-v3MXnZAcvnywkTUEZomIActle7RXXeedOR31wwl7VlyoXO4Qi9arvSenNQWne1TcRwhCL1HwLI21bEqdpj8/rA==}
engines: {node: '>=0.10.0'}
@@ -649,19 +706,23 @@ packages:
rxjs: 7.8.1
dev: false
- /instant-cli@0.14.29:
- resolution: {integrity: sha512-GnYov/Q8jUjFN96rMd91Eg6ucxGd6jQCJnM/MqbeEOPu1KsCWNw5jc+fwfTplyYGlUSGnrbY5SYiJ8EuubwgGQ==}
+ /instant-cli@0.17.7:
+ resolution: {integrity: sha512-USNFUD4tK49dqqjBUpIJObyBsYad6ElMUr9V0pZA+QIM8kwbo2By7MJwLXLLWSxpgy0NOXeAlZtPQI77DRB/3A==}
hasBin: true
dependencies:
- '@inquirer/prompts': 5.5.0
+ '@inquirer/core': 9.0.10
+ '@inquirer/prompts': 5.3.8
+ ansi-escapes: 4.3.2
chalk: 5.3.0
commander: 12.1.0
dotenv: 16.4.5
env-paths: 3.0.0
inquirer: 10.2.2
+ json-diff: 1.0.6
open: 10.1.0
ora: 8.1.1
pkg-dir: 8.0.0
+ prettier: 3.4.2
terminal-link: 3.0.0
unconfig: 0.5.5
transitivePeerDependencies:
@@ -719,6 +780,15 @@ packages:
hasBin: true
dev: false
+ /json-diff@1.0.6:
+ resolution: {integrity: sha512-tcFIPRdlc35YkYdGxcamJjllUhXWv4n2rK9oJ2RsAzV4FBkuV4ojKEDgcZ+kpKxDmJKv+PFK65+1tVVOnSeEqA==}
+ hasBin: true
+ dependencies:
+ '@ewoudenberg/difflib': 0.1.0
+ colors: 1.4.0
+ dreamopt: 0.8.0
+ dev: false
+
/load-tsconfig@0.2.5:
resolution: {integrity: sha512-IXO6OCs9yg8tMKzfPZ1YmheJbZCiEsnBdcB03l0OcfK9prKnJb96siuHCr5Fl37/yo9DnKU+TLpxzTUspw9shg==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
@@ -799,6 +869,12 @@ packages:
find-up-simple: 1.0.0
dev: false
+ /prettier@3.4.2:
+ resolution: {integrity: sha512-e9MewbtFo+Fevyuxn/4rrcDAaq0IYxPGLvObpQjiZBMAzB9IGmzlnG9RZy3FFas+eBMu2vA0CszMeduow5dIuQ==}
+ engines: {node: '>=14'}
+ hasBin: true
+ dev: false
+
/resolve-pkg-maps@1.0.0:
resolution: {integrity: sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw==}
dev: false
@@ -947,6 +1023,10 @@ packages:
hasBin: true
dev: false
+ /wordwrap@1.0.0:
+ resolution: {integrity: sha512-gvVzJFlPycKc5dZN4yPkP8w7Dc37BtP1yczEneOb4uq34pXZcvrtRTmWV8W+Ume+XCxKgbjM+nevkyFPMybd4Q==}
+ dev: false
+
/wrap-ansi@6.2.0:
resolution: {integrity: sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA==}
engines: {node: '>=8'}
diff --git a/server/refinery/rules.yaml b/server/refinery/rules.yaml
index 7c4354fd0..fda94b2c4 100644
--- a/server/refinery/rules.yaml
+++ b/server/refinery/rules.yaml
@@ -20,3 +20,4 @@ Samplers:
FieldList:
- root.name
- root.op
+ - root.entropy
diff --git a/server/src/instant/db/transaction.clj b/server/src/instant/db/transaction.clj
index 1f5123415..191cd564e 100644
--- a/server/src/instant/db/transaction.clj
+++ b/server/src/instant/db/transaction.clj
@@ -10,6 +10,7 @@
[instant.system-catalog :refer [system-catalog-app-id]]
[instant.util.coll :as coll]
[instant.util.exception :as ex]
+ [instant.util.e2e-tracer :as e2e-tracer]
[instant.util.tracer :as tracer]
[next.jdbc :as next-jdbc]))
@@ -206,6 +207,9 @@
results-with-on-deletes (enforce-on-deletes conn attrs app-id results)
tx (transaction-model/create! conn {:app-id app-id})]
+ (e2e-tracer/start-invalidator-tracking! {:tx-id (:id tx)})
+ (e2e-tracer/invalidator-tracking-step! {:tx-id (:id tx)
+ :name "transact"})
(assoc tx :results results-with-on-deletes)))))
(defn transact!
diff --git a/server/src/instant/flags.clj b/server/src/instant/flags.clj
index 2aad8382f..dc66df3f7 100644
--- a/server/src/instant/flags.clj
+++ b/server/src/instant/flags.clj
@@ -15,7 +15,8 @@
:use-patch-presence {}
:drop-refresh-spam {}
:promo-emails {}
- :rate-limited-apps {}})
+ :rate-limited-apps {}
+ :e2e-logging {}})
(defn transform-query-result
"Function that is called on the query result before it is stored in the
@@ -79,13 +80,19 @@
rate-limited-apps (reduce (fn [acc {:strs [appId]}]
(conj acc (parse-uuid appId)))
#{}
- (get result "rate-limited-apps"))]
+ (get result "rate-limited-apps"))
+ e2e-logging (when-let [flag (-> (get result "e2e-logging")
+ first)]
+ {:invalidator-every-n (try (/ 1 (get flag "invalidator-rate"))
+ (catch Exception _e
+ 10000))})]
{:emails emails
:storage-enabled-whitelist storage-enabled-whitelist
:use-patch-presence use-patch-presence
:promo-code-emails promo-code-emails
:drop-refresh-spam drop-refresh-spam
- :rate-limited-apps rate-limited-apps}))
+ :rate-limited-apps rate-limited-apps
+ :e2e-logging e2e-logging}))
(def queries [{:query query :transform #'transform-query-result}])
@@ -148,3 +155,9 @@
(defn app-rate-limited? [app-id]
(contains? (:rate-limited-apps (query-result))
app-id))
+
+(defn e2e-should-honeycomb-publish? [^Long tx-id]
+ (and tx-id
+ (zero? (mod tx-id (or (get-in (query-result)
+ [:e2e-logging :invalidator-every-n])
+ 10000)))))
diff --git a/server/src/instant/lib/ring/websocket.clj b/server/src/instant/lib/ring/websocket.clj
index c477bc687..38bb42b2c 100644
--- a/server/src/instant/lib/ring/websocket.clj
+++ b/server/src/instant/lib/ring/websocket.clj
@@ -10,6 +10,7 @@
(:refer-clojure :exclude [send])
(:require [ring.adapter.undertow.headers :refer [set-headers]]
[instant.util.json :refer [->json]]
+ [instant.util.e2e-tracer :as e2e-tracer]
[instant.util.tracer :as tracer]
[instant.util.delay :as delay])
(:import
@@ -30,6 +31,7 @@
[ring.adapter.undertow Util]
[clojure.lang IPersistentMap]
[io.undertow.websockets.extensions PerMessageDeflateHandshake]
+ [java.util.concurrent ScheduledFuture]
[java.util.concurrent.locks ReentrantLock]
[java.util.concurrent.atomic AtomicLong]
[java.io IOException]
@@ -43,7 +45,9 @@
See `ws-callback` for more details."
[{:keys [on-message on-close-message on-error channel-wrapper
- atomic-last-received-at atomic-last-ping-at set-ping-latency-nanos]}]
+ ^AtomicLong atomic-last-received-at
+ ^AtomicLong atomic-last-ping-at
+ set-ping-latency-nanos]}]
(let [on-message (or on-message (constantly nil))
on-error (or on-error (constantly nil))
on-close-message (or on-close-message (constantly nil))]
@@ -76,7 +80,7 @@
(defn try-send-ping-blocking
"Tries to send a ping-message. Ignores closed channel exceptions."
- [channel]
+ [^WebSocketChannel channel]
(try
(WebSockets/sendPingBlocking
(ByteBuffer/allocate 0)
@@ -158,25 +162,25 @@
(reify WebSocketConnectionCallback
(^void onConnect [_ ^WebSocketHttpExchange exchange ^WebSocketChannel channel]
- (let [ping-job (delay/repeat-fn
- ping-pool
- ping-interval-ms
- (fn []
- (straight-jacket-run-ping-job channel
- atomic-last-received-at
- atomic-last-ping-at
- idle-timeout-ms)))
-
- close-task (reify ChannelListener
- (handleEvent [_this channel]
- (.cancel ping-job false)
- (on-close (channel-wrapper channel))))]
- (.set atomic-last-received-at (System/currentTimeMillis))
- (on-open {:exchange exchange
- :channel (channel-wrapper channel)})
- (.addCloseTask channel close-task)
- (.set (.getReceiveSetter channel) listener)
- (.resumeReceives channel))))))
+ (let [^ScheduledFuture ping-job (delay/repeat-fn
+ ping-pool
+ ping-interval-ms
+ (fn []
+ (straight-jacket-run-ping-job channel
+ atomic-last-received-at
+ atomic-last-ping-at
+ idle-timeout-ms)))
+
+ close-task (reify ChannelListener
+ (handleEvent [_this channel]
+ (.cancel ping-job false)
+ (on-close (channel-wrapper channel))))]
+ (.set atomic-last-received-at (System/currentTimeMillis))
+ (on-open {:exchange exchange
+ :channel (channel-wrapper channel)})
+ (.addCloseTask channel close-task)
+ (.set (.getReceiveSetter channel) listener)
+ (.resumeReceives channel))))))
(defn ws-request [^HttpServerExchange exchange ^IPersistentMap headers ^WebSocketConnectionCallback callback]
(let [handler (-> (WebSocketProtocolHandshakeHandler. callback)
@@ -187,32 +191,34 @@
(defn send-json!
"Serializes `obj` to json, and sends over a websocket."
- [app-id obj {:keys [websocket-stub undertow-websocket send-lock]}]
+ [app-id obj {:keys [websocket-stub undertow-websocket ^ReentrantLock send-lock]}]
;; Websockets/sendText _should_ be thread-safe
;; But, t becomes thread-unsafe when we use per-message-deflate
;; Using a `send-lock` to make `send-json!` thread-safe
(if websocket-stub
(websocket-stub obj)
(let [obj-json (->json obj)
- p (promise)
- _ (try
- (tracer/with-span! {:name "ws/send-json!"
- :attributes {:size (count obj-json)
- :app-id app-id
- :send-lock.queue-length (.getQueueLength send-lock)
- :send-lock.is-locked (.isLocked send-lock)
- :send-lock.held-by-current-thread (.isHeldByCurrentThread send-lock)}}
- (.lock send-lock)
- (WebSockets/sendText
- ^String obj-json
- ^WebSocketChannel undertow-websocket
- (proxy [WebSocketCallback] []
- (complete [ws-conn context]
- (deliver p nil))
- (onError [ws-conn context throwable]
- (deliver p throwable)))))
- (finally
- (.unlock send-lock)))
- ret @p]
- (when (instance? Throwable ret)
- (throw ret)))))
+ p (promise)]
+ (tracer/with-span! {:name "ws/send-json!"
+ :attributes {:app-id app-id
+ :size (count obj-json)}}
+ (try
+ (.lock send-lock)
+ (WebSockets/sendText
+ ^String obj-json
+ ^WebSocketChannel undertow-websocket
+ (proxy [WebSocketCallback] []
+ (complete [ws-conn context]
+ (deliver p nil))
+ (onError [ws-conn context throwable]
+ (deliver p throwable))))
+ (finally
+ (.unlock send-lock)))
+ (let [ret @p]
+ (when-let [tx-id (-> obj meta :tx-id)]
+ (e2e-tracer/invalidator-tracking-step!
+ {:tx-id tx-id
+ :name "send-json-delivered"
+ :attributes {:session-id (-> obj meta :session-id)}}))
+ (when (instance? Throwable ret)
+ (throw ret)))))))
diff --git a/server/src/instant/reactive/invalidator.clj b/server/src/instant/reactive/invalidator.clj
index 7c1ea2f2f..9cc923316 100644
--- a/server/src/instant/reactive/invalidator.clj
+++ b/server/src/instant/reactive/invalidator.clj
@@ -14,6 +14,7 @@
[instant.reactive.store :as rs]
[instant.util.async :as ua]
[instant.util.json :refer [<-json]]
+ [instant.util.e2e-tracer :as e2e-tracer]
[instant.util.tracer :as tracer]
[instant.db.model.triple :as triple-model])
(:import
@@ -272,12 +273,15 @@
(instant-user-model/evict-user-id-from-cache id)))
(when (and some-changes app-id)
- {:attr-changes attrs
- :ident-changes idents
- :triple-changes triples
- :app-id app-id
- :tx-created-at (extract-tx-created-at transactions-change)
- :tx-id (extract-tx-id transactions-change)})))
+ (let [tx-id (extract-tx-id transactions-change)]
+ (e2e-tracer/invalidator-tracking-step! {:tx-id tx-id
+ :name "transform-wal-record"})
+ {:attr-changes attrs
+ :ident-changes idents
+ :triple-changes triples
+ :app-id app-id
+ :tx-created-at (extract-tx-created-at transactions-change)
+ :tx-id (extract-tx-id transactions-change)}))))
(defn wal-record-xf
"Filters wal records for supported changes. Returns [app-id changes]"
@@ -319,10 +323,14 @@
(try
(let [sockets (invalidate! store-conn wal-record)]
(tracer/add-data! {:attributes {:num-sockets (count sockets)}})
+ (e2e-tracer/invalidator-tracking-step! {:tx-id tx-id
+ :name "send-refreshes"
+ :attributes {:num-sockets (count sockets)}})
(tracer/with-span! {:name "invalidator/send-refreshes"}
(doseq [{:keys [id]} sockets]
(receive-queue/enqueue->receive-q {:op :refresh
- :session-id id}))))
+ :session-id id
+ :tx-id tx-id}))))
(catch Throwable t
(def -wal-record wal-record)
(def -store-value @store-conn)
diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj
index 8f3e1061a..5fb5f5d7c 100644
--- a/server/src/instant/reactive/session.clj
+++ b/server/src/instant/reactive/session.clj
@@ -31,6 +31,7 @@
[instant.util.exception :as ex]
[instant.util.json :refer [<-json]]
[instant.util.semver :as semver]
+ [instant.util.e2e-tracer :as e2e-tracer]
[instant.util.tracer :as tracer]
[instant.util.uuid :as uuid-util]
[lambdaisland.uri :as uri])
@@ -167,7 +168,10 @@
:instaql-result instaql-result
:result-changed? result-changed?}))
-(defn- handle-refresh! [store-conn sess-id _event debug-info]
+(defn- handle-refresh! [store-conn sess-id event debug-info]
+ (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id event)
+ :name "start-refresh"
+ :attributes {:session-id sess-id}})
(let [auth (get-auth! store-conn sess-id)
app-id (-> auth :app :id)
current-user (-> auth :user)
@@ -193,17 +197,25 @@
drop-spam? (flags/drop-refresh-spam? app-id)
computations (if drop-spam?
computations
- recompute-results)]
+ recompute-results)
+ tracer-attrs {:num-recomputations num-recomputations
+ :num-spam num-spam
+ :num-computations num-computations
+ :dropped-spam? drop-spam?}]
+ (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id event)
+ :name "finish-refresh-queries"
+ :attributes (assoc tracer-attrs
+ :session-id sess-id)})
(tracer/with-span! {:name "handle-refresh/send-event!"
- :attributes {:num-recomputations num-recomputations
- :num-spam num-spam
- :num-computations num-computations
- :dropped-spam? drop-spam?}}
+ :attributes tracer-attrs}
(when (seq computations)
- (rs/send-event! store-conn app-id sess-id {:op :refresh-ok
- :processed-tx-id processed-tx-id
- :attrs attrs
- :computations computations})))))
+ (rs/send-event! store-conn app-id sess-id (with-meta
+ {:op :refresh-ok
+ :processed-tx-id processed-tx-id
+ :attrs attrs
+ :computations computations}
+ {:tx-id (:tx-id event)
+ :session-id sess-id}))))))
;; -----
;; transact
@@ -566,17 +578,21 @@
(#(%))))
(assoc metadata :skipped-size skipped-size)))))
-(defmulti consolidate
- (fn [type batch]
- (if (= 1 (count batch))
- :default
- type)))
+(defn resolve-consolidate [type batch]
+ (if (= 1 (count batch))
+ :default
+ type))
+
+(defmulti consolidate #'resolve-consolidate)
(defmethod consolidate :default [_ batch]
batch)
(defmethod consolidate :refresh [_ batch]
- [(-> (first batch)
+ (doseq [{:keys [item]} (drop-last batch)]
+ (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id item)
+ :name "skipped-refresh"}))
+ [(-> (last batch)
(assoc :skipped-size (dec (count batch))))])
(defmethod consolidate :refresh-presence [_ batch]
diff --git a/server/src/instant/util/delay.clj b/server/src/instant/util/delay.clj
index 0712218f3..19dcbc1eb 100644
--- a/server/src/instant/util/delay.clj
+++ b/server/src/instant/util/delay.clj
@@ -1,5 +1,6 @@
(ns instant.util.delay
- (:import [java.util.concurrent ScheduledThreadPoolExecutor TimeUnit]))
+ (:import
+ (java.util.concurrent Callable ScheduledThreadPoolExecutor TimeUnit)))
(defn cpu-count []
(.availableProcessors (Runtime/getRuntime)))
@@ -8,11 +9,11 @@
:or {thread-count (+ 2 (cpu-count))}}]
(ScheduledThreadPoolExecutor. thread-count))
-(defn delay-fn [thread-pool delay-ms f]
+(defn delay-fn [^ScheduledThreadPoolExecutor thread-pool ^Long delay-ms ^Callable f]
(.schedule thread-pool f delay-ms TimeUnit/MILLISECONDS))
-(defn repeat-fn [thread-pool delay-ms f]
+(defn repeat-fn [^ScheduledThreadPoolExecutor thread-pool delay-ms f]
(.scheduleAtFixedRate thread-pool f delay-ms delay-ms TimeUnit/MILLISECONDS))
-(defn shutdown-pool! [pool]
+(defn shutdown-pool! [^ScheduledThreadPoolExecutor pool]
(.shutdown pool))
diff --git a/server/src/instant/util/e2e_tracer.clj b/server/src/instant/util/e2e_tracer.clj
new file mode 100644
index 000000000..57c8f2a57
--- /dev/null
+++ b/server/src/instant/util/e2e_tracer.clj
@@ -0,0 +1,74 @@
+(ns instant.util.e2e-tracer
+ (:require [instant.util.tracer :as tracer]
+ [instant.flags :as flags])
+ (:import
+ (io.opentelemetry.api.trace SpanContext)
+ (io.opentelemetry.sdk.trace SdkSpan)
+ (java.lang.reflect Field)
+ (java.nio ByteBuffer)
+ (org.apache.commons.codec.binary Hex)))
+
+;; Starts the trace-id with a1 so that it's easy to spot
+(def tx-id-magic-prefix ^byte (byte -95))
+
+(defn tx-id->trace-id
+ "Creates a stable trace id for a given tx-id so that we can attach
+ a series of spans executing on different machines to the same parent."
+ [^Long tx-id]
+ (-> (ByteBuffer/allocate 16) ;; 16 bytes for the traceid
+ (.put ^byte tx-id-magic-prefix)
+ (.putLong tx-id)
+ (.array)
+ (Hex/encodeHexString)))
+
+(defn tx-id->span-id
+ "Creates a stable span id for a given tx-id so that we can attach
+ a series of spans executing on different machines to the same parent."
+ [^Long tx-id]
+ (-> (ByteBuffer/allocate 8) ;; 16 bytes for the spanId
+ (.putLong tx-id)
+ (.array)
+ (Hex/encodeHexString)))
+
+(defn get-field
+ "Gets a private field from a class instance using reflection."
+ [^Class cls ^String field-name]
+ (doto (.getDeclaredField cls field-name)
+ (.setAccessible true)))
+
+(def context-field ^Field (get-field SdkSpan "context"))
+
+(defn make-invalidator-tracking-span [^Long tx-id attrs]
+ (let [span (binding [tracer/*span* nil] ;; make sure this is a top-level span
+ (tracer/new-span! {:name "e2e/invalidator/tracking-span"
+ :attributes (merge {:tx-id tx-id}
+ attrs)}))
+ context (.getSpanContext ^SdkSpan span)
+ modified-context (SpanContext/create (tx-id->trace-id tx-id)
+ (tx-id->span-id tx-id)
+ (.getTraceFlags context)
+ (.getTraceState context))]
+ (.set ^Field context-field span modified-context)
+ span))
+
+(defn start-invalidator-tracking! [{:keys [^Long tx-id app-id]}]
+ (when (flags/e2e-should-honeycomb-publish? tx-id)
+ (let [span (make-invalidator-tracking-span tx-id {:app-id app-id
+ ;; encourage honeycomb not
+ ;; to skip this span
+ :entropy tx-id})]
+ (tracer/end-span! span))))
+
+(defn invalidator-tracking-step! [{:keys [^Long tx-id] :as span-opts}]
+ ;; Create a new span with a stable trace-id and span-id for the parent
+ (when (flags/e2e-should-honeycomb-publish? tx-id)
+ (binding [tracer/*span* (make-invalidator-tracking-span tx-id nil)]
+ (tracer/record-info!
+ (-> span-opts
+ (update :name (fn [s] (format "e2e/invalidator/%s" s)))
+ (update :attributes (fn [a]
+ (merge a
+ {:tx-id tx-id
+ ;; encourage honeycomb not
+ ;; to skip this span
+ :entropy tx-id}))))))))
diff --git a/server/src/instant/util/logging_exporter.clj b/server/src/instant/util/logging_exporter.clj
index 25493d431..382b658c9 100644
--- a/server/src/instant/util/logging_exporter.clj
+++ b/server/src/instant/util/logging_exporter.clj
@@ -5,6 +5,7 @@
[clojure.string :as string]
[instant.config :as config])
(:import (io.opentelemetry.api.common AttributeKey)
+ (io.opentelemetry.api.trace Span)
(io.opentelemetry.sdk.common CompletableResultCode)
(io.opentelemetry.sdk.trace.export SpanExporter)
(java.util.concurrent TimeUnit)
@@ -120,33 +121,37 @@
(def exclude-span?
(if (= :prod (config/get-env))
+ (fn [^Span span]
+ (let [n (.getName span)]
+ (case n
+ ("gc"
+ "ws/send-json!"
+ "handle-refresh/send-event!"
+ "store/record-datalog-query-finish!"
+ "store/record-datalog-query-start!"
+ "store/swap-datalog-cache!"
+ "store/bump-instaql-version!"
+ "store/add-instaql-query!"
+ "instaql/get-eid-check-result!"
+ "extract-permission-helpers"
+ "instaql/map-permissioned-node") true
+
+ ("receive-worker/handle-event"
+ "receive-worker/handle-receive")
+ (case (-> (.getAttributes span)
+ (.get op-attr-key))
+ (":set-presence"
+ ":refresh-presence"
+ ":server-broadcast"
+ ":client-broadcast") true
+
+ false)
+
+ (string/starts-with? n "e2e"))))
(fn [span]
- (case (.getName span)
- ("ws/send-json!"
- "handle-refresh/send-event!"
- "store/record-datalog-query-finish!"
- "store/record-datalog-query-start!"
- "store/swap-datalog-cache!"
- "store/bump-instaql-version!"
- "store/add-instaql-query!"
- "instaql/get-eid-check-result!"
- "extract-permission-helpers"
- "instaql/map-permissioned-node") true
-
- ("receive-worker/handle-event"
- "receive-worker/handle-receive")
- (case (-> (.getAttributes span)
- (.get op-attr-key))
- (":set-presence"
- ":refresh-presence"
- ":server-broadcast"
- ":client-broadcast") true
-
- false)
-
- false))
- (fn [span]
- (= (.getName span) "gc"))))
+ (let [n (.getName span)]
+ (or (= n "gc")
+ (string/starts-with? n "e2e"))))))
(defn log-spans [spans]
(doseq [span spans
diff --git a/server/src/instant/util/tracer.clj b/server/src/instant/util/tracer.clj
index 4fe4881e4..42d4f868e 100644
--- a/server/src/instant/util/tracer.clj
+++ b/server/src/instant/util/tracer.clj
@@ -1,20 +1,20 @@
(ns instant.util.tracer
"Span lib for integrating with Honeycomb"
+ (:gen-class)
(:require
[clojure.main :as main]
- [instant.util.logging-exporter :as logging-exporter]
[instant.config :as config]
+ [instant.util.logging-exporter :as logging-exporter]
[steffan-westcott.clj-otel.api.attributes :as attr])
(:import
- (io.opentelemetry.sdk OpenTelemetrySdk)
- (io.opentelemetry.api.common Attributes AttributeKey)
+ (io.opentelemetry.api.common AttributeKey Attributes)
(io.opentelemetry.api.trace Span StatusCode)
(io.opentelemetry.context Context)
(io.opentelemetry.exporter.otlp.trace OtlpGrpcSpanExporter)
+ (io.opentelemetry.sdk OpenTelemetrySdk)
(io.opentelemetry.sdk.resources Resource)
- (io.opentelemetry.sdk.trace SdkTracerProvider SdkTracer)
- (io.opentelemetry.sdk.trace.export BatchSpanProcessor SimpleSpanProcessor))
- (:gen-class))
+ (io.opentelemetry.sdk.trace SdkTracer SdkTracerProvider)
+ (io.opentelemetry.sdk.trace.export BatchSpanProcessor SimpleSpanProcessor)))
(def ^:dynamic *span* nil)