Skip to content

Commit bbadd60

Browse files
committed
feat(core): Add Supabase Queues support
1 parent 33c2a66 commit bbadd60

File tree

5 files changed

+277
-4
lines changed

5 files changed

+277
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://[email protected]/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues.rpc('enqueue', {
22+
queue_name: 'todos',
23+
msg: { title: 'Test Todo' },
24+
});
25+
26+
await queues.rpc('dequeue', {
27+
queue_name: 'todos',
28+
});
29+
} catch (error) {
30+
Sentry.captureException(error);
31+
}
32+
}
33+
34+
performQueueOperations();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import type { Page} from '@playwright/test';
2+
import { expect } from '@playwright/test';
3+
import type { Event } from '@sentry/core';
4+
5+
import { sentryTest } from '../../../../utils/fixtures';
6+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
7+
8+
async function mockSupabaseRoute(page: Page) {
9+
await page.route('**/rest/v1/rpc**', route => {
10+
return route.fulfill({
11+
status: 200,
12+
body: JSON.stringify({
13+
foo: ['bar', 'baz'],
14+
}),
15+
headers: {
16+
'Content-Type': 'application/json',
17+
},
18+
});
19+
});
20+
}
21+
22+
sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
23+
await mockSupabaseRoute(page);
24+
25+
if (shouldSkipTracingTest()) {
26+
return;
27+
}
28+
29+
const url = await getLocalTestUrl({ testDir: __dirname });
30+
31+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
32+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
33+
34+
expect(queueSpans).toHaveLength(2);
35+
36+
expect(queueSpans![0]).toMatchObject({
37+
description: 'supabase.db.rpc',
38+
parent_span_id: event.contexts?.trace?.span_id,
39+
span_id: expect.any(String),
40+
start_timestamp: expect.any(Number),
41+
timestamp: expect.any(Number),
42+
trace_id: event.contexts?.trace?.trace_id,
43+
data: expect.objectContaining({
44+
'sentry.op': 'queue.publish',
45+
'sentry.origin': 'auto.db.supabase',
46+
'messaging.destination.name': 'todos',
47+
'messaging.message.id': 'Test Todo',
48+
}),
49+
});
50+
51+
expect(queueSpans![1]).toMatchObject({
52+
description: 'supabase.db.rpc',
53+
parent_span_id: event.contexts?.trace?.span_id,
54+
span_id: expect.any(String),
55+
start_timestamp: expect.any(Number),
56+
timestamp: expect.any(Number),
57+
trace_id: event.contexts?.trace?.trace_id,
58+
data: expect.objectContaining({
59+
'sentry.op': 'queue.process',
60+
'sentry.origin': 'auto.db.supabase',
61+
'messaging.destination.name': 'todos',
62+
}),
63+
});
64+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import * as Sentry from '@sentry/browser';
2+
3+
import { createClient } from '@supabase/supabase-js';
4+
window.Sentry = Sentry;
5+
6+
const queues = createClient('https://test.supabase.co', 'test-key', {
7+
db: {
8+
schema: 'pgmq_public',
9+
},
10+
});
11+
12+
Sentry.init({
13+
dsn: 'https://[email protected]/1337',
14+
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)],
15+
tracesSampleRate: 1.0,
16+
});
17+
18+
// Simulate queue operations
19+
async function performQueueOperations() {
20+
try {
21+
await queues
22+
.schema('pgmq_public')
23+
.rpc('enqueue', {
24+
queue_name: 'todos',
25+
msg: { title: 'Test Todo' },
26+
});
27+
28+
await queues
29+
.schema('pgmq_public')
30+
.rpc('dequeue', {
31+
queue_name: 'todos',
32+
});
33+
} catch (error) {
34+
Sentry.captureException(error);
35+
}
36+
}
37+
38+
performQueueOperations();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { type Page, expect } from '@playwright/test';
2+
import type { Event } from '@sentry/core';
3+
4+
import { sentryTest } from '../../../../utils/fixtures';
5+
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';
6+
7+
async function mockSupabaseRoute(page: Page) {
8+
await page.route('**/rest/v1/rpc**', route => {
9+
return route.fulfill({
10+
status: 200,
11+
body: JSON.stringify({
12+
foo: ['bar', 'baz'],
13+
}),
14+
headers: {
15+
'Content-Type': 'application/json',
16+
},
17+
});
18+
});
19+
}
20+
21+
sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
22+
await mockSupabaseRoute(page);
23+
24+
if (shouldSkipTracingTest()) {
25+
return;
26+
}
27+
28+
const url = await getLocalTestUrl({ testDir: __dirname });
29+
30+
const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
31+
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue'));
32+
33+
expect(queueSpans).toHaveLength(2);
34+
35+
expect(queueSpans![0]).toMatchObject({
36+
description: 'supabase.db.rpc',
37+
parent_span_id: event.contexts?.trace?.span_id,
38+
span_id: expect.any(String),
39+
start_timestamp: expect.any(Number),
40+
timestamp: expect.any(Number),
41+
trace_id: event.contexts?.trace?.trace_id,
42+
data: expect.objectContaining({
43+
'sentry.op': 'queue.publish',
44+
'sentry.origin': 'auto.db.supabase',
45+
'messaging.destination.name': 'todos',
46+
'messaging.message.id': 'Test Todo',
47+
}),
48+
});
49+
50+
expect(queueSpans![1]).toMatchObject({
51+
description: 'supabase.db.rpc',
52+
parent_span_id: event.contexts?.trace?.span_id,
53+
span_id: expect.any(String),
54+
start_timestamp: expect.any(Number),
55+
timestamp: expect.any(Number),
56+
trace_id: event.contexts?.trace?.trace_id,
57+
data: expect.objectContaining({
58+
'sentry.op': 'queue.process',
59+
'sentry.origin': 'auto.db.supabase',
60+
'messaging.destination.name': 'todos',
61+
}),
62+
});
63+
});

packages/core/src/integrations/supabase.ts

+78-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import { SPAN_STATUS_ERROR, SPAN_STATUS_OK } from '../tracing';
1515
export interface SupabaseClientConstructor {
1616
prototype: {
1717
from: (table: string) => PostgrestQueryBuilder;
18+
schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> };
1819
};
20+
rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
1921
}
2022

2123
const AUTH_OPERATIONS_TO_INSTRUMENT = [
@@ -192,6 +194,76 @@ export function translateFiltersIntoMethods(key: string, query: string): string
192194
return `${method}(${key}, ${value.join('.')})`;
193195
}
194196

197+
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
198+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
199+
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
200+
{
201+
apply(target, thisArg, argumentsList) {
202+
const rv = Reflect.apply(target, thisArg, argumentsList);
203+
204+
return instrumentRpc(rv);
205+
},
206+
},
207+
);
208+
}
209+
210+
function instrumentRpc(SupabaseClient: unknown): unknown {
211+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy(
212+
(SupabaseClient as unknown as SupabaseClientConstructor).rpc,
213+
{
214+
apply(target, thisArg, argumentsList) {
215+
const isProducerSpan = argumentsList[0] === 'enqueue';
216+
const isConsumerSpan = argumentsList[0] === 'dequeue';
217+
218+
const maybeQueueParams = argumentsList[1];
219+
220+
// If the second argument is not an object, it's not a queue operation
221+
if (!isPlainObject(maybeQueueParams)) {
222+
return Reflect.apply(target, thisArg, argumentsList);
223+
}
224+
225+
const msg = maybeQueueParams?.msg as { title: string };
226+
227+
const messageId = msg?.title;
228+
const queueName = maybeQueueParams?.queue_name as string;
229+
230+
const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : '';
231+
232+
// If the operation is not a queue operation, return the original function
233+
if (!op) {
234+
return Reflect.apply(target, thisArg, argumentsList);
235+
}
236+
237+
return startSpan(
238+
{
239+
name: 'supabase.db.rpc',
240+
attributes: {
241+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
242+
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op,
243+
},
244+
},
245+
async span => {
246+
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => {
247+
if (messageId) {
248+
span.setAttribute('messaging.message.id', messageId);
249+
}
250+
251+
if (queueName) {
252+
span.setAttribute('messaging.destination.name', queueName);
253+
}
254+
255+
span.end();
256+
return res;
257+
});
258+
},
259+
);
260+
},
261+
},
262+
);
263+
264+
return SupabaseClient;
265+
}
266+
195267
function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn {
196268
return new Proxy(operation, {
197269
apply(target, thisArg, argumentsList) {
@@ -261,13 +333,13 @@ function instrumentSupabaseAuthClient(supabaseClientInstance: SupabaseClientInst
261333
});
262334
}
263335

264-
function instrumentSupabaseClientConstructor(SupabaseClient: unknown): void {
265-
if (instrumented.has(SupabaseClient)) {
336+
function instrumentSupabaseClientConstructor(SupabaseClientConstructor: unknown): void {
337+
if (instrumented.has(SupabaseClientConstructor)) {
266338
return;
267339
}
268340

269-
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.from = new Proxy(
270-
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.from,
341+
(SupabaseClientConstructor as unknown as SupabaseClientConstructor).prototype.from = new Proxy(
342+
(SupabaseClientConstructor as unknown as SupabaseClientConstructor).prototype.from,
271343
{
272344
apply(target, thisArg, argumentsList) {
273345
const rv = Reflect.apply(target, thisArg, argumentsList);
@@ -482,6 +554,8 @@ const instrumentSupabase = (supabaseClientInstance: unknown): void => {
482554
supabaseClientInstance.constructor === Function ? supabaseClientInstance : supabaseClientInstance.constructor;
483555

484556
instrumentSupabaseClientConstructor(SupabaseClientConstructor);
557+
instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor);
558+
instrumentRpc(supabaseClientInstance as SupabaseClientInstance);
485559
instrumentSupabaseAuthClient(supabaseClientInstance as SupabaseClientInstance);
486560
};
487561

0 commit comments

Comments
 (0)