Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion components/providers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ThemeProvider } from "@/components/theme-provider"
import { PageViewTracker } from "@/components/page-view-tracker"
import { CookieBanner } from "@/components/cookie-banner"
import { Logger } from "@/components/logger"
import { RealtimeProvider } from "@/components/realtime-provider"
import { Toaster } from "@/components/ui/toaster"

interface ProvidersProps {
Expand All @@ -19,7 +20,7 @@ export function Providers({ children }: ProvidersProps) {
<ThemeProvider attribute="class" defaultTheme="system" enableSystem disableTransitionOnChange>
<Logger />
<PageViewTracker />
{children}
<RealtimeProvider>{children}</RealtimeProvider>
<Toaster />
<CookieBanner />
</ThemeProvider>
Expand Down
50 changes: 50 additions & 0 deletions components/realtime-provider.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"use client"

import type { ReactNode } from "react"
import { useEffect } from "react"
import { useRouter } from "next/navigation"
import { getRealtimeClient } from "@/lib/realtime-client"

interface RealtimeProviderProps {
children: ReactNode
}

export function RealtimeProvider({ children }: RealtimeProviderProps) {
const router = useRouter()

useEffect(() => {
const controller = new AbortController()

fetch("/api/socket/io", { signal: controller.signal }).catch(() => {
// The socket route will respond immediately once the server is ready.
})

const socket = getRealtimeClient()

if (!socket) {
controller.abort()
return
}

const handleEvent = () => {
router.refresh()
}

const handleConnect = () => {
router.refresh()
}

socket.on("connect", handleConnect)

socket.on("realtime:event", handleEvent)

return () => {
controller.abort()
socket.off("realtime:event", handleEvent)
socket.off("connect", handleConnect)
}
}, [router])

return <>{children}</>
}

6 changes: 6 additions & 0 deletions lib/db.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { PrismaClient as PrismaClientType } from "@prisma/client"

import { registerPrismaRealtime } from "@/lib/realtime-events"

type PrismaClientConstructor = new (...args: unknown[]) => PrismaClientType

const globalForPrisma = globalThis as unknown as { prisma?: PrismaClientType }
Expand Down Expand Up @@ -108,6 +110,10 @@ const prisma =
})
: createStubClient())

if (PrismaClientCtor) {
registerPrismaRealtime(prisma as PrismaClientType)
}

if (process.env.NODE_ENV !== "production") {
globalForPrisma.prisma = prisma
}
Expand Down
24 changes: 24 additions & 0 deletions lib/realtime-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"use client"

import { io, type Socket } from "socket.io-client"

let socketInstance: Socket | null = null

export function getRealtimeClient() {
if (typeof window === "undefined") {
return null
}

if (!socketInstance) {
socketInstance = io({
path: "/api/socket/io",
transports: ["websocket", "polling"],
autoConnect: true,
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
})
}

return socketInstance
}
55 changes: 55 additions & 0 deletions lib/realtime-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { PrismaClient } from "@prisma/client"

import { enqueueRealtimeEvent } from "@/lib/realtime-queue"

const MUTATION_ACTIONS = new Set([
"create",
"createMany",
"update",
"updateMany",
"upsert",
"delete",
"deleteMany",
])

const globalForRealtime = globalThis as unknown as {
prismaRealtimeRegistered?: boolean
}

export function registerPrismaRealtime(prisma: PrismaClient) {
if (globalForRealtime.prismaRealtimeRegistered) {
return
}

prisma.$use(async (params, next) => {

Check failure on line 24 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Parameter 'next' implicitly has an 'any' type.

Check failure on line 24 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Parameter 'params' implicitly has an 'any' type.
const result = await next(params)

if (MUTATION_ACTIONS.has(params.action)) {
const model = params.model ?? "unknown"

try {
await enqueueRealtimeEvent(`prisma:${model}:${params.action}`, {
model,
action: params.action,
args: params.args,
result,
timestamp: Date.now(),
})
Comment on lines +47 to +58
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid hanging Prisma writes when Redis is down

Each Prisma mutation awaits enqueueRealtimeEvent, and the Redis clients created in lib/redis.ts set maxRetriesPerRequest: null (infinite retries). When Redis is unavailable or misconfigured, queue.add never resolves nor rejects, so every database write waits forever before the middleware returns. This change effectively makes all writes dependent on Redis uptime even though the realtime queue is optional. Consider not awaiting the enqueue call or adding a bounded timeout/retry so mutations can complete even when Redis is offline.

Useful? React with 👍 / 👎.

} catch (error) {
console.error("Failed to enqueue realtime Prisma event", error)
}
}

return result
})

globalForRealtime.prismaRealtimeRegistered = true
}

export async function broadcastRealtimeEvent(event: string, payload: unknown) {
try {
await enqueueRealtimeEvent(event, payload)
} catch (error) {
console.error("Failed to enqueue realtime event", error)
}
}
83 changes: 83 additions & 0 deletions lib/realtime-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Queue, Worker } from "bullmq"
import type { JobsOptions } from "bullmq"

import { createRedisConnection, getRedisUrl } from "@/lib/redis"
import { getRealtimeIO } from "@/lib/realtime"

const queueName = "realtime-events"

const globalForQueue = globalThis as unknown as {
realtimeQueue?: Queue
realtimeWorker?: Worker
realtimeQueueConnection?: ReturnType<typeof createRedisConnection>
realtimeWorkerConnection?: ReturnType<typeof createRedisConnection>
}

function ensureConnections() {
if (!globalForQueue.realtimeQueueConnection) {
globalForQueue.realtimeQueueConnection = createRedisConnection()
}

if (!globalForQueue.realtimeWorkerConnection) {
globalForQueue.realtimeWorkerConnection = createRedisConnection()
}
}

export function getRealtimeQueue() {
ensureConnections()

if (!globalForQueue.realtimeQueue) {
globalForQueue.realtimeQueue = new Queue(queueName, {
connection: globalForQueue.realtimeQueueConnection,

Check failure on line 31 in lib/realtime-queue.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Type 'Redis | undefined' is not assignable to type 'ConnectionOptions'.
})
}

return globalForQueue.realtimeQueue
}

function ensureWorker() {
ensureConnections()

if (!globalForQueue.realtimeWorker) {
globalForQueue.realtimeWorker = new Worker(
queueName,
async job => {
const io = getRealtimeIO()

if (io) {
io.emit("realtime:event", {
event: job.name,
payload: job.data,
jobId: job.id,
timestamp: Date.now(),
})

io.emit(job.name, job.data)
}
},
{ connection: globalForQueue.realtimeWorkerConnection },

Check failure on line 58 in lib/realtime-queue.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Type 'Redis | undefined' is not assignable to type 'ConnectionOptions'.
)

globalForQueue.realtimeWorker.on("error", error => {
console.error("Realtime worker error", error)
})
}
}

export async function enqueueRealtimeEvent(event: string, payload: unknown, options?: JobsOptions) {
const queue = getRealtimeQueue()
ensureWorker()

await queue.add(event, payload, {
removeOnComplete: { age: 60, count: 1000 },
removeOnFail: { age: 60 * 60, count: 100 },
...options,
})
}

export function describeRealtimeQueue() {
return {
name: queueName,
redis: getRedisUrl(),
}
}
14 changes: 14 additions & 0 deletions lib/realtime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { Server as IOServer } from "socket.io"

const globalForRealtime = globalThis as unknown as {
realtimeIO?: IOServer
}

export function getRealtimeIO() {
return globalForRealtime.realtimeIO
}

export function setRealtimeIO(io: IOServer) {
globalForRealtime.realtimeIO = io
return io
}
28 changes: 28 additions & 0 deletions lib/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import IORedis from "ioredis"

const redisUrl = process.env.REDIS_URL || "redis://127.0.0.1:6379"

const baseOptions = {
maxRetriesPerRequest: null as number | null,
enableReadyCheck: true,
}

const globalForRedis = globalThis as unknown as {
redisConnection?: IORedis
}

export function createRedisConnection() {
return new IORedis(redisUrl, baseOptions)
}

export function getRedisConnection() {
if (!globalForRedis.redisConnection) {
globalForRedis.redisConnection = createRedisConnection()
}

return globalForRedis.redisConnection
}

export function getRedisUrl() {
return redisUrl
}
1 change: 1 addition & 0 deletions next-env.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// <reference types="next" />
/// <reference types="next/image-types/global" />
/// <reference types="next/navigation-types/compat/navigation" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/app/api-reference/config/typescript for more information.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"@react-three/fiber": "^9.0.0",
"autoprefixer": "^10.4.20",
"bcryptjs": "3.0.2",
"bullmq": "^5.63.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "1.0.4",
Expand All @@ -67,6 +68,7 @@
"framer-motion": "^11.11.7",
"immer": "10.1.3",
"input-otp": "1.4.1",
"ioredis": "^5.8.2",
"lucide-react": "^0.454.0",
"next": "15.4.7",
"next-auth": "4.24.13",
Expand All @@ -80,6 +82,8 @@
"react-resizable-panels": "^2.1.7",
"react-syntax-highlighter": "^16.1.0",
"recharts": "2.15.0",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"sonner": "^1.7.1",
"swr": "^2.3.3",
"tailwind-merge": "^2.5.5",
Expand Down
48 changes: 48 additions & 0 deletions pages/api/socket/io.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import type { NextApiRequest } from "next"
import { Server as IOServer } from "socket.io"

import type { NextApiResponseServerIO } from "@/types/next"
import { getRealtimeIO, setRealtimeIO } from "@/lib/realtime"

export const config = {
api: {
bodyParser: false,
},
}

export default function handler(req: NextApiRequest, res: NextApiResponseServerIO) {
const existingServer = res.socket.server as typeof res.socket.server & { io?: IOServer }

if (!existingServer.io) {
const io = new IOServer(res.socket.server, {
path: "/api/socket/io",
cors: {
origin: process.env.NEXT_PUBLIC_SITE_URL || "*",
},
})

setRealtimeIO(io)
existingServer.io = io

io.on("connection", socket => {
socket.emit("realtime:event", {
event: "socket:connected",
socketId: socket.id,
timestamp: Date.now(),
})

socket.on("disconnect", reason => {
socket.broadcast.emit("realtime:event", {
event: "socket:disconnected",
socketId: socket.id,
reason,
timestamp: Date.now(),
})
})
})
} else if (!getRealtimeIO()) {
setRealtimeIO(existingServer.io)
}

res.end()
}
Loading
Loading