Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions components/dashboard-nav.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"

export function DashboardNav() {
const pathname = usePathname()
const currentPath = pathname ?? ""
const router = useRouter()
const { toast } = useToast()
const { setTheme, theme } = useTheme()
Expand Down Expand Up @@ -69,10 +70,10 @@ export function DashboardNav() {
]

const isActive = (path: string) => {
if (path === "/dashboard" && pathname === "/dashboard") {
if (path === "/dashboard" && currentPath === "/dashboard") {
return true
}
return path !== "/dashboard" && pathname.startsWith(path)
return path !== "/dashboard" && currentPath.startsWith(path)
}

const handleLogout = async () => {
Expand Down
5 changes: 3 additions & 2 deletions components/dashboard-shell.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ interface DashboardShellProps {
export function DashboardShell({ children }: DashboardShellProps) {
const router = useRouter()
const pathname = usePathname()
const currentPath = pathname ?? ""
const [isMobile, setIsMobile] = useState(false)
const [isOpen, setIsOpen] = useState(false)

Expand All @@ -58,10 +59,10 @@ export function DashboardShell({ children }: DashboardShellProps) {

// Check if the menu item should be active
const isActive = (path: string) => {
if (path === "/dashboard" && pathname === "/dashboard") {
if (path === "/dashboard" && currentPath === "/dashboard") {
return true
}
return path !== "/dashboard" && pathname.startsWith(path)
return path !== "/dashboard" && currentPath.startsWith(path)
}

const handleLogout = async () => {
Expand Down
3 changes: 2 additions & 1 deletion components/providers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ThemeProvider } from "@/components/theme-provider"
import { PageViewTracker } from "@/components/page-view-tracker"
import { CookieBanner } from "@/components/cookie-banner"
import { Logger } from "@/components/logger"
import { RealtimeProvider } from "@/components/realtime-provider"
import { Toaster } from "@/components/ui/toaster"

interface ProvidersProps {
Expand All @@ -19,7 +20,7 @@ export function Providers({ children }: ProvidersProps) {
<ThemeProvider attribute="class" defaultTheme="system" enableSystem disableTransitionOnChange>
<Logger />
<PageViewTracker />
{children}
<RealtimeProvider>{children}</RealtimeProvider>
<Toaster />
<CookieBanner />
</ThemeProvider>
Expand Down
50 changes: 50 additions & 0 deletions components/realtime-provider.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"use client"

import type { ReactNode } from "react"
import { useEffect } from "react"
import { useRouter } from "next/navigation"
import { getRealtimeClient } from "@/lib/realtime-client"

interface RealtimeProviderProps {
children: ReactNode
}

export function RealtimeProvider({ children }: RealtimeProviderProps) {
const router = useRouter()

useEffect(() => {
const controller = new AbortController()

fetch("/api/socket/io", { signal: controller.signal }).catch(() => {
// The socket route will respond immediately once the server is ready.
})

const socket = getRealtimeClient()

if (!socket) {
controller.abort()
return
}

const handleEvent = () => {
router.refresh()
}

const handleConnect = () => {
router.refresh()
}

socket.on("connect", handleConnect)

socket.on("realtime:event", handleEvent)

return () => {
controller.abort()
socket.off("realtime:event", handleEvent)
socket.off("connect", handleConnect)
}
}, [router])

return <>{children}</>
}

2 changes: 1 addition & 1 deletion components/unsubscribe-client.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {

export default function UnsubscribeClient() {
const searchParams = useSearchParams()
const token = searchParams.get("token")
const token = searchParams?.get("token") ?? null
const [status, setStatus] = useState<
"verifying" | "ready" | "processing" | "success" | "error" | "preferences"
>(token ? "verifying" : "error")
Expand Down
6 changes: 6 additions & 0 deletions lib/db.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { PrismaClient as PrismaClientType } from "@prisma/client"

import { registerPrismaRealtime } from "@/lib/realtime-events"

type PrismaClientConstructor = new (...args: unknown[]) => PrismaClientType

const globalForPrisma = globalThis as unknown as { prisma?: PrismaClientType }
Expand Down Expand Up @@ -108,6 +110,10 @@ const prisma =
})
: createStubClient())

if (PrismaClientCtor) {
registerPrismaRealtime(prisma as PrismaClientType)
}

if (process.env.NODE_ENV !== "production") {
globalForPrisma.prisma = prisma
}
Expand Down
24 changes: 24 additions & 0 deletions lib/realtime-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"use client"

import { io, type Socket } from "socket.io-client"

let socketInstance: Socket | null = null

export function getRealtimeClient() {
if (typeof window === "undefined") {
return null
}

if (!socketInstance) {
socketInstance = io({
path: "/api/socket/io",
transports: ["websocket", "polling"],
autoConnect: true,
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
})
}

return socketInstance
}
78 changes: 78 additions & 0 deletions lib/realtime-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import type { PrismaClient } from "@prisma/client"

import { enqueueRealtimeEvent } from "@/lib/realtime-queue"

type PrismaMiddleware = Parameters<PrismaClient["$use"]>[0]
type PrismaMiddlewareParams = Parameters<PrismaMiddleware>[0]

Check failure on line 6 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Type 'unknown' does not satisfy the constraint '(...args: any) => any'.
type PrismaMiddlewareNext = Parameters<PrismaMiddleware>[1]

Check failure on line 7 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Type 'unknown' does not satisfy the constraint '(...args: any) => any'.

const MUTATION_ACTIONS = new Set([
"create",
"createMany",
"update",
"updateMany",
"upsert",
"delete",
"deleteMany",
])

const globalForRealtime = globalThis as unknown as {
prismaRealtimeRegistered?: boolean
}

export function registerPrismaRealtime(prisma: PrismaClient) {
if (globalForRealtime.prismaRealtimeRegistered) {
return
}

const middlewareCapablePrisma = prisma as PrismaClient & {
$use?: PrismaClient["$use"]
}

if (typeof middlewareCapablePrisma.$use !== "function") {
console.warn("Prisma client does not support middleware; skipping realtime registration")
globalForRealtime.prismaRealtimeRegistered = true
return
}

const realtimeMiddleware: PrismaMiddleware = async (
params: PrismaMiddlewareParams,
next: PrismaMiddlewareNext,
) => {
const result = await next(params)

Check failure on line 42 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

This expression is not callable.

if (MUTATION_ACTIONS.has(params.action)) {

Check failure on line 44 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Property 'action' does not exist on type 'never'.
const model = params.model ?? "unknown"

Check failure on line 45 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Property 'model' does not exist on type 'never'.

try {
const enqueuePromise = enqueueRealtimeEvent(`prisma:${model}:${params.action}`, {

Check failure on line 48 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Property 'action' does not exist on type 'never'.
model,
action: params.action,

Check failure on line 50 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Property 'action' does not exist on type 'never'.
args: params.args,

Check failure on line 51 in lib/realtime-events.ts

View workflow job for this annotation

GitHub Actions / Lint, typecheck, and build

Property 'args' does not exist on type 'never'.
result,
timestamp: Date.now(),
})

void enqueuePromise.catch(error => {
console.error("Failed to enqueue realtime Prisma event", error)
})
} catch (error) {
console.error("Failed to enqueue realtime Prisma event", error)
}
}

return result
}

middlewareCapablePrisma.$use(realtimeMiddleware)

globalForRealtime.prismaRealtimeRegistered = true
}

export async function broadcastRealtimeEvent(event: string, payload: unknown) {
try {
await enqueueRealtimeEvent(event, payload)
} catch (error) {
console.error("Failed to enqueue realtime event", error)
}
}
89 changes: 89 additions & 0 deletions lib/realtime-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Queue, Worker } from "bullmq"
import type { JobsOptions } from "bullmq"

import { createRedisConnection, getRedisUrl } from "@/lib/redis"
import { getRealtimeIO } from "@/lib/realtime"

const queueName = "realtime-events"

const globalForQueue = globalThis as unknown as {
realtimeQueue?: Queue
realtimeWorker?: Worker
realtimeQueueConnection?: ReturnType<typeof createRedisConnection>
realtimeWorkerConnection?: ReturnType<typeof createRedisConnection>
}

function getQueueConnection() {
if (!globalForQueue.realtimeQueueConnection) {
globalForQueue.realtimeQueueConnection = createRedisConnection()
}

return globalForQueue.realtimeQueueConnection
}

function getWorkerConnection() {
if (!globalForQueue.realtimeWorkerConnection) {
globalForQueue.realtimeWorkerConnection = createRedisConnection()
}

return globalForQueue.realtimeWorkerConnection
}

export function getRealtimeQueue() {
const connection = getQueueConnection()

if (!globalForQueue.realtimeQueue) {
globalForQueue.realtimeQueue = new Queue(queueName, {
connection,
})
}

return globalForQueue.realtimeQueue
}

function ensureWorker() {
const connection = getWorkerConnection()

if (!globalForQueue.realtimeWorker) {
globalForQueue.realtimeWorker = new Worker(
queueName,
async job => {
const io = getRealtimeIO()

if (io) {
io.emit("realtime:event", {
event: job.name,
payload: job.data,
jobId: job.id,
timestamp: Date.now(),
})

io.emit(job.name, job.data)
}
},
{ connection },
)

globalForQueue.realtimeWorker.on("error", error => {
console.error("Realtime worker error", error)
})
}
}

export async function enqueueRealtimeEvent(event: string, payload: unknown, options?: JobsOptions) {
const queue = getRealtimeQueue()
ensureWorker()

await queue.add(event, payload, {
removeOnComplete: { age: 60, count: 1000 },
removeOnFail: { age: 60 * 60, count: 100 },
...options,
})
}

export function describeRealtimeQueue() {
return {
name: queueName,
redis: getRedisUrl(),
}
}
14 changes: 14 additions & 0 deletions lib/realtime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { Server as IOServer } from "socket.io"

const globalForRealtime = globalThis as unknown as {
realtimeIO?: IOServer
}

export function getRealtimeIO() {
return globalForRealtime.realtimeIO
}

export function setRealtimeIO(io: IOServer) {
globalForRealtime.realtimeIO = io
return io
}
28 changes: 28 additions & 0 deletions lib/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import IORedis from "ioredis"

const redisUrl = process.env.REDIS_URL || "redis://127.0.0.1:6379"

const baseOptions = {
maxRetriesPerRequest: null as number | null,
enableReadyCheck: true,
}

const globalForRedis = globalThis as unknown as {
redisConnection?: IORedis
}

export function createRedisConnection() {
return new IORedis(redisUrl, baseOptions)
}

export function getRedisConnection() {
if (!globalForRedis.redisConnection) {
globalForRedis.redisConnection = createRedisConnection()
}

return globalForRedis.redisConnection
}

export function getRedisUrl() {
return redisUrl
}
1 change: 1 addition & 0 deletions next-env.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// <reference types="next" />
/// <reference types="next/image-types/global" />
/// <reference types="next/navigation-types/compat/navigation" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/app/api-reference/config/typescript for more information.
Loading
Loading