Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions lib/realtime-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@ import type { PrismaClient } from "@prisma/client"

import { enqueueRealtimeEvent } from "@/lib/realtime-queue"

type PrismaMiddleware = Parameters<PrismaClient["$use"]>[0]
type PrismaMiddlewareParams = Parameters<PrismaMiddleware>[0]
type PrismaMiddlewareNext = Parameters<PrismaMiddleware>[1]
type PrismaMiddlewareParams = {
action: string
model?: string | null
args?: unknown
[key: string]: unknown
}

type PrismaMiddlewareNext = (params: PrismaMiddlewareParams) => Promise<unknown>

type PrismaMiddleware = (
params: PrismaMiddlewareParams,
next: PrismaMiddlewareNext,
) => Promise<unknown>

const MUTATION_ACTIONS = new Set([
"create",
Expand Down
6 changes: 3 additions & 3 deletions lib/realtime-queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Queue, Worker } from "bullmq"
import type { JobsOptions } from "bullmq"
import type { Job, JobsOptions } from "bullmq"

import { createRedisConnection, getRedisUrl } from "@/lib/redis"
import { getRealtimeIO } from "@/lib/realtime"
Expand Down Expand Up @@ -47,7 +47,7 @@ function ensureWorker() {
if (!globalForQueue.realtimeWorker) {
globalForQueue.realtimeWorker = new Worker(
queueName,
async job => {
async (job: Job) => {
const io = getRealtimeIO()

if (io) {
Expand All @@ -64,7 +64,7 @@ function ensureWorker() {
{ connection },
)

globalForQueue.realtimeWorker.on("error", error => {
globalForQueue.realtimeWorker.on("error", (error: Error) => {
console.error("Realtime worker error", error)
})
}
Expand Down
5 changes: 3 additions & 2 deletions pages/api/socket/io.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { NextApiRequest } from "next"
import { Server as IOServer } from "socket.io"
import type { DisconnectReason, Socket } from "socket.io"

import type { NextApiResponseServerIO } from "@/types/next"
import { getRealtimeIO, setRealtimeIO } from "@/lib/realtime"
Expand All @@ -24,14 +25,14 @@ export default function handler(req: NextApiRequest, res: NextApiResponseServerI
setRealtimeIO(io)
existingServer.io = io

io.on("connection", socket => {
io.on("connection", (socket: Socket) => {
socket.emit("realtime:event", {
event: "socket:connected",
socketId: socket.id,
timestamp: Date.now(),
})

socket.on("disconnect", reason => {
socket.on("disconnect", (reason: DisconnectReason) => {
socket.broadcast.emit("realtime:event", {
event: "socket:disconnected",
socketId: socket.id,
Expand Down
103 changes: 103 additions & 0 deletions types/external.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
declare module "socket.io-client" {
export interface Socket {
id: string
emit(event: string, ...args: unknown[]): void
on(event: string, listener: (...args: unknown[]) => void): this
off(event: string, listener: (...args: unknown[]) => void): this
connect(): void
disconnect(): void
broadcast?: {
emit(event: string, ...args: unknown[]): void
}
}

export interface SocketOptions {
path?: string
transports?: string[]
autoConnect?: boolean
reconnection?: boolean
reconnectionDelay?: number
reconnectionDelayMax?: number
}

export function io(opts?: SocketOptions): Socket
export function io(uri: string, opts?: SocketOptions): Socket
export { io }
export default io
}

declare module "socket.io" {
import type { Server as HTTPServer } from "http"

export interface ServerOptions {
path?: string
cors?: {
origin?: string | string[]
}
}

export interface BroadcastOperator {
emit(event: string, ...args: unknown[]): void
}

export type DisconnectReason = string

export interface Socket {
id: string
emit(event: string, ...args: unknown[]): void
on(event: "disconnect", listener: (reason: DisconnectReason) => void): this
on(event: string, listener: (...args: unknown[]) => void): this
broadcast: BroadcastOperator
}

export class Server {
constructor(httpServer?: HTTPServer, opts?: ServerOptions)
emit(event: string, ...args: unknown[]): boolean
on(event: "connection", listener: (socket: Socket) => void): this
on(event: string, listener: (...args: unknown[]) => void): this
}

export { Server as IOServer }
}

declare module "bullmq" {
export interface JobsOptions {
removeOnComplete?: boolean | { age?: number; count?: number }
removeOnFail?: boolean | { age?: number; count?: number }
[key: string]: unknown
}

export interface QueueOptions {
connection?: unknown
}

export interface WorkerOptions {
connection?: unknown
}

export interface Job<Data = unknown> {
id?: string | number
name: string
data: Data
}

export class Queue<Data = unknown, Result = unknown, Name extends string = string> {
constructor(name: Name, opts?: QueueOptions)
add(name: Name, data: Data, opts?: JobsOptions): Promise<Job<Data>>
}

export class Worker<Data = unknown, Name extends string = string> {
constructor(name: Name, processor: (job: Job<Data>) => Promise<unknown>, opts?: WorkerOptions)
on(event: "error", handler: (error: Error) => void): this
}
}

declare module "ioredis" {
export interface RedisOptions {
[key: string]: unknown
}

export default class IORedis {
constructor(url?: string, options?: RedisOptions)
}
}