Skip to content

Commit 3876a33

Browse files
committed
Check upload phase and resolve video source
1 parent 4e58ed4 commit 3876a33

File tree

3 files changed

+111
-24
lines changed

3 files changed

+111
-24
lines changed

apps/web/__tests__/integration/transcribe.test.ts

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ vi.mock("@cap/env", () => ({
88
}));
99

1010
const mockStart = vi.hoisted(() => vi.fn());
11+
const schemaMocks = vi.hoisted(() => ({
12+
videos: { id: "id", settings: "settings" },
13+
organizations: { id: "id", settings: "settings" },
14+
s3Buckets: { id: "id" },
15+
videoUploads: { videoId: "videoId", phase: "phase" },
16+
}));
1117

1218
vi.mock("workflow/api", () => ({
1319
start: mockStart,
@@ -18,19 +24,27 @@ vi.mock("@/workflows/transcribe", () => ({
1824
}));
1925

2026
let mockQueryResult: unknown[] = [];
27+
let mockUploadQueryResult: unknown[] = [];
2128

2229
vi.mock("@cap/database", () => ({
2330
db: () => ({
2431
select: () => ({
25-
from: () => ({
26-
leftJoin: () => ({
27-
leftJoin: () => ({
28-
where: vi
29-
.fn()
30-
.mockImplementation(() => Promise.resolve(mockQueryResult)),
31-
}),
32-
}),
33-
}),
32+
from: (table: unknown) =>
33+
table === schemaMocks.videoUploads
34+
? {
35+
where: vi.fn().mockReturnValue({
36+
limit: vi.fn().mockResolvedValue(mockUploadQueryResult),
37+
}),
38+
}
39+
: {
40+
leftJoin: () => ({
41+
leftJoin: () => ({
42+
where: vi
43+
.fn()
44+
.mockImplementation(() => Promise.resolve(mockQueryResult)),
45+
}),
46+
}),
47+
},
3448
}),
3549
update: () => ({
3650
set: () => ({
@@ -41,9 +55,10 @@ vi.mock("@cap/database", () => ({
4155
}));
4256

4357
vi.mock("@cap/database/schema", () => ({
44-
videos: { id: "id", settings: "settings" },
45-
organizations: { id: "id", settings: "settings" },
46-
s3Buckets: { id: "id" },
58+
videos: schemaMocks.videos,
59+
organizations: schemaMocks.organizations,
60+
s3Buckets: schemaMocks.s3Buckets,
61+
videoUploads: schemaMocks.videoUploads,
4762
}));
4863

4964
vi.mock("drizzle-orm", () => ({
@@ -58,6 +73,7 @@ describe("transcribeVideo", () => {
5873
beforeEach(() => {
5974
vi.clearAllMocks();
6075
mockQueryResult = [];
76+
mockUploadQueryResult = [];
6177
});
6278

6379
describe("input validation", () => {
@@ -266,6 +282,19 @@ describe("transcribeVideo", () => {
266282
mockStart.mockResolvedValue({ id: "workflow-run-123" });
267283
});
268284

285+
it("does not trigger while upload is still active", async () => {
286+
mockUploadQueryResult = [{ phase: "processing" }];
287+
288+
const result = await transcribeVideo(
289+
"video-123" as Video.VideoId,
290+
"user-456",
291+
);
292+
293+
expect(result.success).toBe(true);
294+
expect(result.message).toBe("Video upload is still in progress");
295+
expect(mockStart).not.toHaveBeenCalled();
296+
});
297+
269298
it("triggers workflow for valid video", async () => {
270299
const result = await transcribeVideo(
271300
"video-123" as Video.VideoId,

apps/web/lib/transcribe.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { db } from "@cap/database";
2-
import { organizations, s3Buckets, videos } from "@cap/database/schema";
2+
import {
3+
organizations,
4+
s3Buckets,
5+
videos,
6+
videoUploads,
7+
} from "@cap/database/schema";
38
import { serverEnv } from "@cap/env";
49
import type { Video } from "@cap/web-domain";
510
import { eq } from "drizzle-orm";
@@ -95,6 +100,23 @@ export async function transcribeVideo(
95100
};
96101
}
97102

103+
const upload = await db()
104+
.select({ phase: videoUploads.phase })
105+
.from(videoUploads)
106+
.where(eq(videoUploads.videoId, videoId))
107+
.limit(1);
108+
109+
if (
110+
upload[0]?.phase === "uploading" ||
111+
upload[0]?.phase === "processing" ||
112+
upload[0]?.phase === "generating_thumbnail"
113+
) {
114+
return {
115+
success: true,
116+
message: "Video upload is still in progress",
117+
};
118+
}
119+
98120
try {
99121
console.log(
100122
`[transcribeVideo] Triggering transcription workflow for video ${videoId}`,

apps/web/workflows/transcribe.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { promises as fs } from "node:fs";
22
import { db } from "@cap/database";
3-
import { organizations, s3Buckets, users, videos } from "@cap/database/schema";
3+
import {
4+
organizations,
5+
s3Buckets,
6+
users,
7+
videos,
8+
videoUploads,
9+
} from "@cap/database/schema";
410
import type { VideoMetadata } from "@cap/database/types";
511
import { serverEnv } from "@cap/env";
612
import { userIsPro } from "@cap/utils";
@@ -175,16 +181,7 @@ async function extractAudio(
175181
Option.fromNullable(bucketId),
176182
).pipe(runPromise);
177183

178-
const videoKey = `${userId}/${videoId}/result.mp4`;
179-
const videoUrl = await bucket.getSignedObjectUrl(videoKey).pipe(runPromise);
180-
181-
const response = await fetch(videoUrl, {
182-
method: "GET",
183-
headers: { range: "bytes=0-0" },
184-
});
185-
if (!response.ok) {
186-
throw new Error("Video file not accessible");
187-
}
184+
const videoUrl = await resolveVideoSourceUrl(videoId, userId, bucketId);
188185

189186
const useMediaServer = isMediaServerConfigured();
190187

@@ -228,6 +225,45 @@ async function extractAudio(
228225
return audioSignedUrl;
229226
}
230227

228+
async function resolveVideoSourceUrl(
229+
videoId: string,
230+
userId: string,
231+
bucketId: S3Bucket.S3BucketId | null,
232+
): Promise<string> {
233+
const [bucket] = await S3Buckets.getBucketAccess(
234+
Option.fromNullable(bucketId),
235+
).pipe(runPromise);
236+
237+
const upload = await db()
238+
.select({ rawFileKey: videoUploads.rawFileKey })
239+
.from(videoUploads)
240+
.where(eq(videoUploads.videoId, videoId as Video.VideoId))
241+
.limit(1);
242+
243+
const candidateKeys = [
244+
`${userId}/${videoId}/result.mp4`,
245+
upload[0]?.rawFileKey,
246+
].filter(
247+
(value, index, values): value is string =>
248+
Boolean(value) && values.indexOf(value) === index,
249+
);
250+
251+
for (const key of candidateKeys) {
252+
const url = await bucket.getSignedObjectUrl(key).pipe(runPromise);
253+
const response = await fetch(url, {
254+
method: "GET",
255+
headers: { range: "bytes=0-0" },
256+
});
257+
258+
if (response.ok) {
259+
console.log(`[transcribe] Using video source ${key}`);
260+
return url;
261+
}
262+
}
263+
264+
throw new Error("Video file not accessible");
265+
}
266+
231267
async function transcribeWithDeepgram(audioUrl: string): Promise<string> {
232268
"use step";
233269

0 commit comments

Comments
 (0)