From 13d60e0a7d0c4979be75c77dd40af0f4b3ac465f Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Fri, 28 Mar 2025 18:41:54 +0000 Subject: [PATCH] feat(core): Add Supabase Queues support --- .../integrations/supabase/queues-rpc/init.js | 34 +++++++ .../integrations/supabase/queues-rpc/test.ts | 64 +++++++++++++ .../supabase/queues-schema/init.js | 38 ++++++++ .../supabase/queues-schema/test.ts | 63 +++++++++++++ packages/core/src/integrations/supabase.ts | 94 +++++++++++++++++-- 5 files changed, 283 insertions(+), 10 deletions(-) create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js new file mode 100644 index 000000000000..7b0fdb096ebc --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js @@ -0,0 +1,34 @@ +import * as Sentry from '@sentry/browser'; + +import { createClient } from '@supabase/supabase-js'; +window.Sentry = Sentry; + +const queues = createClient('https://test.supabase.co', 'test-key', { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)], + tracesSampleRate: 1.0, +}); + +// Simulate queue operations +async function performQueueOperations() { + try { + await queues.rpc('enqueue', { + queue_name: 'todos', + msg: { title: 'Test Todo' }, + }); + + await queues.rpc('dequeue', { + queue_name: 'todos', + }); + } catch (error) { + Sentry.captureException(error); + } +} + +performQueueOperations(); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts new file mode 100644 index 000000000000..8b6ee89e9f81 --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts @@ -0,0 +1,64 @@ +import type { Page} from '@playwright/test'; +import { expect } from '@playwright/test'; +import type { Event } from '@sentry/core'; + +import { sentryTest } from '../../../../utils/fixtures'; +import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; + +async function mockSupabaseRoute(page: Page) { + await page.route('**/rest/v1/rpc**', route => { + return route.fulfill({ + status: 200, + body: JSON.stringify({ + foo: ['bar', 'baz'], + }), + headers: { + 'Content-Type': 'application/json', + }, + }); + }); +} + +sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => { + await mockSupabaseRoute(page); + + if (shouldSkipTracingTest()) { + return; + } + + const url = await getLocalTestUrl({ testDir: __dirname }); + + const event = await getFirstSentryEnvelopeRequest(page, url); + const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue')); + + expect(queueSpans).toHaveLength(2); + + expect(queueSpans![0]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + 'messaging.message.id': 'Test Todo', + }), + }); + + expect(queueSpans![1]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + }), + }); +}); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js new file mode 100644 index 000000000000..43c50357f1eb --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js @@ -0,0 +1,38 @@ +import * as Sentry from '@sentry/browser'; + +import { createClient } from '@supabase/supabase-js'; +window.Sentry = Sentry; + +const queues = createClient('https://test.supabase.co', 'test-key', { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)], + tracesSampleRate: 1.0, +}); + +// Simulate queue operations +async function performQueueOperations() { + try { + await queues + .schema('pgmq_public') + .rpc('enqueue', { + queue_name: 'todos', + msg: { title: 'Test Todo' }, + }); + + await queues + .schema('pgmq_public') + .rpc('dequeue', { + queue_name: 'todos', + }); + } catch (error) { + Sentry.captureException(error); + } +} + +performQueueOperations(); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts new file mode 100644 index 000000000000..8070a1b17357 --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts @@ -0,0 +1,63 @@ +import { type Page, expect } from '@playwright/test'; +import type { Event } from '@sentry/core'; + +import { sentryTest } from '../../../../utils/fixtures'; +import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; + +async function mockSupabaseRoute(page: Page) { + await page.route('**/rest/v1/rpc**', route => { + return route.fulfill({ + status: 200, + body: JSON.stringify({ + foo: ['bar', 'baz'], + }), + headers: { + 'Content-Type': 'application/json', + }, + }); + }); +} + +sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => { + await mockSupabaseRoute(page); + + if (shouldSkipTracingTest()) { + return; + } + + const url = await getLocalTestUrl({ testDir: __dirname }); + + const event = await getFirstSentryEnvelopeRequest(page, url); + const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue')); + + expect(queueSpans).toHaveLength(2); + + expect(queueSpans![0]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + 'messaging.message.id': 'Test Todo', + }), + }); + + expect(queueSpans![1]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + }), + }); +}); diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index 85ca8a50b6ed..9ac1ea7a0d5d 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -12,6 +12,14 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from ' import { captureException } from '../exports'; import { SPAN_STATUS_ERROR, SPAN_STATUS_OK } from '../tracing'; +export interface SupabaseClientConstructor { + prototype: { + from: (table: string) => PostgrestQueryBuilder; + schema: (schema: string) => { rpc: (...args: unknown[]) => Promise }; + }; + rpc: (fn: string, params: Record) => Promise; +} + const AUTH_OPERATIONS_TO_INSTRUMENT = [ 'reauthenticate', 'signInAnonymously', @@ -114,12 +122,6 @@ export interface SupabaseBreadcrumb { }; } -export interface SupabaseClientConstructor { - prototype: { - from: (table: string) => PostgrestQueryBuilder; - }; -} - export interface PostgrestProtoThenable { then: ( onfulfilled?: ((value: T) => T | PromiseLike) | null, @@ -197,6 +199,76 @@ export function translateFiltersIntoMethods(key: string, query: string): string return `${method}(${key}, ${value.join('.')})`; } +function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { + (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( + (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, + { + apply(target, thisArg, argumentsList) { + const rv = Reflect.apply(target, thisArg, argumentsList); + + return instrumentRpc(rv); + }, + }, + ); +} + +function instrumentRpc(SupabaseClient: unknown): unknown { + (SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy( + (SupabaseClient as unknown as SupabaseClientConstructor).rpc, + { + apply(target, thisArg, argumentsList) { + const isProducerSpan = argumentsList[0] === 'enqueue'; + const isConsumerSpan = argumentsList[0] === 'dequeue'; + + const maybeQueueParams = argumentsList[1]; + + // If the second argument is not an object, it's not a queue operation + if (!isPlainObject(maybeQueueParams)) { + return Reflect.apply(target, thisArg, argumentsList); + } + + const msg = maybeQueueParams?.msg as { title: string }; + + const messageId = msg?.title; + const queueName = maybeQueueParams?.queue_name as string; + + const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : ''; + + // If the operation is not a queue operation, return the original function + if (!op) { + return Reflect.apply(target, thisArg, argumentsList); + } + + return startSpan( + { + name: 'supabase.db.rpc', + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, + }, + }, + async span => { + return (Reflect.apply(target, thisArg, argumentsList) as Promise).then((res: unknown) => { + if (messageId) { + span.setAttribute('messaging.message.id', messageId); + } + + if (queueName) { + span.setAttribute('messaging.destination.name', queueName); + } + + span.end(); + return res; + }); + }, + ); + }, + }, + ); + + return SupabaseClient; +} + function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn { return new Proxy(operation, { apply(target, thisArg, argumentsList) { @@ -266,13 +338,13 @@ function instrumentSupabaseAuthClient(supabaseClientInstance: SupabaseClientInst }); } -function instrumentSupabaseClientConstructor(SupabaseClient: unknown): void { - if (instrumented.has(SupabaseClient)) { +function instrumentSupabaseClientConstructor(SupabaseClientConstructor: unknown): void { + if (instrumented.has(SupabaseClientConstructor)) { return; } - (SupabaseClient as unknown as SupabaseClientConstructor).prototype.from = new Proxy( - (SupabaseClient as unknown as SupabaseClientConstructor).prototype.from, + (SupabaseClientConstructor as unknown as SupabaseClientConstructor).prototype.from = new Proxy( + (SupabaseClientConstructor as unknown as SupabaseClientConstructor).prototype.from, { apply(target, thisArg, argumentsList) { const rv = Reflect.apply(target, thisArg, argumentsList); @@ -466,6 +538,8 @@ const instrumentSupabase = (supabaseClientInstance: unknown): void => { supabaseClientInstance.constructor === Function ? supabaseClientInstance : supabaseClientInstance.constructor; instrumentSupabaseClientConstructor(SupabaseClientConstructor); + instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor); + instrumentRpc(supabaseClientInstance as SupabaseClientInstance); instrumentSupabaseAuthClient(supabaseClientInstance as SupabaseClientInstance); };