Skip to content

Commit b38405c

Browse files
authored
Realtime and task run performance improvements (#2158)
* Add createdAt filter to realtime subscribing with tags * Filter realtime colums and expose ability to skip some columns * Add sharding support for electric * Use unkey cache for the created at filter caching * Remove 2 unused indexes on TaskRun * Run list now filters by a single runtime environment * Remove project ID indexes * Use clickhouse in task list aggregation queries instead of pg (keep pg for self-hosters) * WIP clickhouse powered runs list stuff * Improve the query to get the latest tasks for the task list presenter * Update the usage task list to use clickhouse * Implement next runs list powered by clickhouse * Add new index for TaskRun for the runs list, by environment ID * Add runTags gin index * Handle possibly malicious inputs * Ignore claude settings * Better handling not finding an environment on the schedule page * Use ms since epoch in test, not seconds * Remove unused function * Fix test * Use an env var for the realtime maximum createdAt filter duration (defaults to 1 day) * Fixed the query builder to correct the group by / order by order * Make sure runs.list still works * Create small-birds-arrive.md
1 parent e617c14 commit b38405c

File tree

62 files changed

+4256
-458
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+4256
-458
lines changed

.changeset/small-birds-arrive.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
---
4+
5+
Added the ability to specify a "createdAt" filter when subscribing to tags in our useRealtime hooks:
6+
7+
```tsx
8+
// Only subscribe to runs created in the last 10 hours
9+
useRealtimeRunWithTags("my-tag", { createdAt: "10h" })
10+
```
11+
12+
You can also now choose to skip subscribing to specific columns by specifying the `skipColumns` option:
13+
14+
```tsx
15+
useRealtimeRun(run.id, { skipColumns: ["usageDurationMs"] });
16+
```

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@ apps/**/public/build
6363
/packages/core/src/package.json
6464
/packages/trigger-sdk/src/package.json
6565
/packages/python/src/package.json
66+
.claude

apps/webapp/app/components/runs/v3/TaskRunsTable.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ function BlankState({ isLoading, filters }: Pick<RunsTableProps, "isLoading" | "
539539
const environment = useEnvironment();
540540
if (isLoading) return <TableBlankRow colSpan={15}></TableBlankRow>;
541541

542-
const { environments, tasks, from, to, ...otherFilters } = filters;
542+
const { tasks, from, to, ...otherFilters } = filters;
543543

544544
if (
545545
filters.tasks.length === 1 &&

apps/webapp/app/env.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ const EnvironmentSchema = z.object({
3535
API_ORIGIN: z.string().optional(),
3636
STREAM_ORIGIN: z.string().optional(),
3737
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
38+
// A comma separated list of electric origins to shard into different electric instances by environmentId
39+
// example: "http://localhost:3060,http://localhost:3061,http://localhost:3062"
40+
ELECTRIC_ORIGIN_SHARDS: z.string().optional(),
3841
APP_ENV: z.string().default(process.env.NODE_ENV),
3942
SERVICE_NAME: z.string().default("trigger.dev webapp"),
4043
POSTHOG_PROJECT_KEY: z.string().default("phc_LFH7kJiGhdIlnO22hTAKgHpaKhpM8gkzWAFvHmf5vfS"),
@@ -161,6 +164,11 @@ const EnvironmentSchema = z.object({
161164
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
162165
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
163166

167+
REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
168+
.number()
169+
.int()
170+
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds
171+
164172
PUBSUB_REDIS_HOST: z
165173
.string()
166174
.optional()
@@ -738,6 +746,14 @@ const EnvironmentSchema = z.object({
738746
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
739747
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
740748

749+
// Clickhouse
750+
CLICKHOUSE_URL: z.string().optional(),
751+
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
752+
CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
753+
CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
754+
CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
755+
CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
756+
741757
// Bootstrap
742758
TRIGGER_BOOTSTRAP_ENABLED: z.string().default("0"),
743759
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: z.string().optional(),

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,36 @@ export function displayableEnvironment(
275275
userName,
276276
};
277277
}
278+
279+
export async function findDisplayableEnvironment(
280+
environmentId: string,
281+
userId: string | undefined
282+
) {
283+
const environment = await prisma.runtimeEnvironment.findFirst({
284+
where: {
285+
id: environmentId,
286+
},
287+
select: {
288+
id: true,
289+
type: true,
290+
slug: true,
291+
orgMember: {
292+
select: {
293+
user: {
294+
select: {
295+
id: true,
296+
name: true,
297+
displayName: true,
298+
},
299+
},
300+
},
301+
},
302+
},
303+
});
304+
305+
if (!environment) {
306+
return;
307+
}
308+
309+
return displayableEnvironment(environment, userId);
310+
}

apps/webapp/app/models/task.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server";
77
* It has indexes for fast performance.
88
* It does NOT care about versions, so includes all tasks ever created.
99
*/
10-
export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, projectId: string) {
10+
export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, environmentId: string) {
1111
return prisma.$queryRaw<
1212
{
1313
slug: string;
@@ -16,6 +16,6 @@ export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, project
1616
>`
1717
SELECT DISTINCT(slug), "triggerSource"
1818
FROM ${sqlDatabaseSchema}."BackgroundWorkerTask"
19-
WHERE "projectId" = ${projectId}
19+
WHERE "runtimeEnvironmentId" = ${environmentId}
2020
ORDER BY slug ASC;`;
2121
}

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { CoercedDate } from "~/utils/zod";
1212
import { ApiRetrieveRunPresenter } from "./ApiRetrieveRunPresenter.server";
1313
import { type RunListOptions, RunListPresenter } from "./RunListPresenter.server";
1414
import { BasePresenter } from "./basePresenter.server";
15+
import { ServiceValidationError } from "~/v3/services/baseService.server";
1516

1617
export const ApiRunListSearchParams = z.object({
1718
"page[size]": z.coerce.number().int().positive().min(1).max(100).optional(),
@@ -134,9 +135,11 @@ export class ApiRunListPresenter extends BasePresenter {
134135
options.direction = "backward";
135136
}
136137

138+
let environmentId: string | undefined;
139+
137140
// filters
138141
if (environment) {
139-
options.environments = [environment.id];
142+
environmentId = environment.id;
140143
} else {
141144
if (searchParams["filter[env]"]) {
142145
const environments = await this._prisma.runtimeEnvironment.findMany({
@@ -148,10 +151,14 @@ export class ApiRunListPresenter extends BasePresenter {
148151
},
149152
});
150153

151-
options.environments = environments.map((env) => env.id);
154+
environmentId = environments.at(0)?.id;
152155
}
153156
}
154157

158+
if (!environmentId) {
159+
throw new ServiceValidationError("No environment found");
160+
}
161+
155162
if (searchParams["filter[status]"]) {
156163
options.statuses = searchParams["filter[status]"].flatMap((status) =>
157164
ApiRunListPresenter.apiStatusToRunStatuses(status)
@@ -202,9 +209,9 @@ export class ApiRunListPresenter extends BasePresenter {
202209

203210
logger.debug("Calling RunListPresenter", { options });
204211

205-
const results = await presenter.call(options);
212+
const results = await presenter.call(environmentId, options);
206213

207-
logger.debug("RunListPresenter results", { results });
214+
logger.debug("RunListPresenter results", { runs: results.runs.length });
208215

209216
const data: ListRunResponseItem[] = await Promise.all(
210217
results.runs.map(async (run) => {

0 commit comments

Comments
 (0)