Skip to content

Commit 7079322

Browse files
authored
fix & enable spawner in desktop app; use trpc (#566)
1 parent f03b6b0 commit 7079322

24 files changed

+470
-823
lines changed

apps/api/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
"codepod": "./build/bin.js"
1313
},
1414
"dependencies": {
15+
"@codepod/runtime": "workspace:^",
1516
"@codepod/ui": "workspace:^",
17+
"@codepod/yjs": "workspace:^",
18+
"@trpc/server": "^10.43.0",
1619
"commander": "^11.0.0",
1720
"cors": "^2.8.5",
1821
"express": "^4.18.2",
@@ -21,7 +24,8 @@
2124
"lodash": "^4.17.21",
2225
"ws": "^8.2.3",
2326
"y-protocols": "^1.0.5",
24-
"yjs": "^13.6.7"
27+
"yjs": "^13.6.7",
28+
"zod": "^3.22.4"
2529
},
2630
"devDependencies": {
2731
"@types/express": "^4.17.14",

apps/api/src/server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ import express from "express";
22
import http from "http";
33
import { WebSocketServer } from "ws";
44

5+
import * as trpcExpress from "@trpc/server/adapters/express";
6+
57
import { createSetupWSConnection } from "./yjs/yjs-setupWS";
68
import { bindState, writeState } from "./yjs-blob";
79

810
import cors from "cors";
11+
import { appRouter } from "./trpc";
912

1013
export async function startServer({ port, blobDir }) {
1114
console.log("starting server ..");
@@ -18,6 +21,13 @@ export async function startServer({ port, blobDir }) {
1821
console.log("html path: ", path);
1922
app.use(express.static(path));
2023

24+
app.use(
25+
"/trpc",
26+
trpcExpress.createExpressMiddleware({
27+
router: appRouter,
28+
})
29+
);
30+
2131
const http_server = http.createServer(app);
2232

2333
// Yjs websocket
File renamed without changes.

apps/api/src/trpc.ts

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import { initTRPC } from "@trpc/server";
2+
const t = initTRPC.create();
3+
export const router = t.router;
4+
export const publicProcedure = t.procedure;
5+
6+
import Y from "yjs";
7+
import WebSocket from "ws";
8+
import { z } from "zod";
9+
10+
import { WebsocketProvider } from "@codepod/yjs/src/y-websocket";
11+
12+
import { killRuntime, spawnRuntime } from "./spawner_native";
13+
14+
import { connectSocket, runtime2socket, RuntimeInfo } from "./yjs_runtime";
15+
16+
// FIXME need to have a TTL to clear the ydoc.
17+
const docs: Map<string, Y.Doc> = new Map();
18+
19+
// FIXME hard-coded yjs server url
20+
const yjsServerUrl = `ws://localhost:4000/socket`;
21+
22+
async function getMyYDoc({ repoId }): Promise<Y.Doc> {
23+
return new Promise((resolve, reject) => {
24+
const oldydoc = docs.get(repoId);
25+
if (oldydoc) {
26+
resolve(oldydoc);
27+
return;
28+
}
29+
const ydoc = new Y.Doc();
30+
// connect to primary database
31+
console.log("connecting to y-websocket provider", yjsServerUrl);
32+
const provider = new WebsocketProvider(yjsServerUrl, repoId, ydoc, {
33+
// resyncInterval: 2000,
34+
//
35+
// BC is more complex to track our custom Uploading status and SyncDone events.
36+
disableBc: true,
37+
params: {
38+
role: "runtime",
39+
},
40+
// IMPORTANT: import websocket, because we're running it in node.js
41+
WebSocketPolyfill: WebSocket as any,
42+
});
43+
provider.on("status", ({ status }) => {
44+
console.log("provider status", status);
45+
});
46+
provider.once("synced", () => {
47+
console.log("Provider synced");
48+
docs.set(repoId, ydoc);
49+
resolve(ydoc);
50+
});
51+
provider.connect();
52+
});
53+
}
54+
55+
const routingTable: Map<string, string> = new Map();
56+
57+
const spawnRouter = router({
58+
spawnRuntime: publicProcedure
59+
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
60+
.mutation(async ({ input: { runtimeId, repoId } }) => {
61+
console.log("spawnRuntime", runtimeId, repoId);
62+
// create the runtime container
63+
const wsUrl = await spawnRuntime(runtimeId);
64+
console.log("Runtime spawned at", wsUrl);
65+
routingTable.set(runtimeId, wsUrl);
66+
// set initial runtimeMap info for this runtime
67+
console.log("Loading yDoc ..");
68+
const doc = await getMyYDoc({ repoId });
69+
console.log("yDoc loaded");
70+
const rootMap = doc.getMap("rootMap");
71+
const runtimeMap = rootMap.get("runtimeMap") as Y.Map<RuntimeInfo>;
72+
runtimeMap.set(runtimeId, {});
73+
// console.log("=== runtimeMap", runtimeMap);
74+
let values = Array.from(runtimeMap.values());
75+
const keys = Array.from(runtimeMap.keys());
76+
console.log("all runtimes", keys);
77+
const nodesMap = rootMap.get("nodesMap") as Y.Map<any>;
78+
const nodes = Array.from(nodesMap.values());
79+
console.log("all nodes", nodes);
80+
return true;
81+
}),
82+
killRuntime: publicProcedure
83+
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
84+
.mutation(async ({ input: { runtimeId, repoId } }) => {
85+
await killRuntime(runtimeId);
86+
console.log("Removing route ..");
87+
// remove from runtimeMap
88+
const doc = await getMyYDoc({ repoId });
89+
const rootMap = doc.getMap("rootMap");
90+
const runtimeMap = rootMap.get("runtimeMap") as Y.Map<RuntimeInfo>;
91+
runtimeMap.delete(runtimeId);
92+
routingTable.delete(runtimeId);
93+
return true;
94+
}),
95+
96+
connectRuntime: publicProcedure
97+
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
98+
.mutation(async ({ input: { runtimeId, repoId } }) => {
99+
console.log("=== connectRuntime", runtimeId, repoId);
100+
// assuming doc is already loaded.
101+
// FIXME this socket/ is the prefix of url. This is very prone to errors.
102+
const doc = await getMyYDoc({ repoId });
103+
const rootMap = doc.getMap("rootMap");
104+
console.log("rootMap", Array.from(rootMap.keys()));
105+
const runtimeMap = rootMap.get("runtimeMap") as any;
106+
const resultMap = rootMap.get("resultMap") as any;
107+
await connectSocket({
108+
runtimeId,
109+
runtimeMap,
110+
resultMap,
111+
routingTable,
112+
});
113+
}),
114+
disconnectRuntime: publicProcedure
115+
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
116+
.mutation(async ({ input: { runtimeId, repoId } }) => {
117+
console.log("=== disconnectRuntime", runtimeId);
118+
// get socket
119+
const socket = runtime2socket.get(runtimeId);
120+
if (socket) {
121+
socket.close();
122+
runtime2socket.delete(runtimeId);
123+
}
124+
125+
const doc = await getMyYDoc({ repoId });
126+
const rootMap = doc.getMap("rootMap");
127+
const runtimeMap = rootMap.get("runtimeMap") as Y.Map<RuntimeInfo>;
128+
runtimeMap.set(runtimeId, {});
129+
}),
130+
runCode: publicProcedure
131+
.input(
132+
z.object({
133+
runtimeId: z.string(),
134+
spec: z.object({ code: z.string(), podId: z.string() }),
135+
})
136+
)
137+
.mutation(
138+
async ({
139+
input: {
140+
runtimeId,
141+
spec: { code, podId },
142+
},
143+
}) => {
144+
console.log("runCode", runtimeId, podId);
145+
const socket = runtime2socket.get(runtimeId);
146+
if (!socket) return false;
147+
// clear old results
148+
// TODO move this to frontend, because it is hard to get ydoc in GraphQL handler.
149+
//
150+
// console.log("clear old result");
151+
// console.log("old", resultMap.get(runtimeId));
152+
// resultMap.set(podId, { data: [] });
153+
// console.log("new", resultMap.get(runtimeId));
154+
// console.log("send new result");
155+
socket.send(
156+
JSON.stringify({
157+
type: "runCode",
158+
payload: {
159+
lang: "python",
160+
code: code,
161+
raw: true,
162+
podId: podId,
163+
sessionId: runtimeId,
164+
},
165+
})
166+
);
167+
return true;
168+
}
169+
),
170+
runChain: publicProcedure
171+
.input(
172+
z.object({
173+
runtimeId: z.string(),
174+
specs: z.array(z.object({ code: z.string(), podId: z.string() })),
175+
})
176+
)
177+
.mutation(async ({ input: { runtimeId, specs } }) => {
178+
console.log("runChain", runtimeId);
179+
const socket = runtime2socket.get(runtimeId);
180+
if (!socket) return false;
181+
specs.forEach(({ code, podId }) => {
182+
socket.send(
183+
JSON.stringify({
184+
type: "runCode",
185+
payload: {
186+
lang: "python",
187+
code: code,
188+
raw: true,
189+
podId: podId,
190+
sessionId: runtimeId,
191+
},
192+
})
193+
);
194+
});
195+
return true;
196+
}),
197+
interruptKernel: publicProcedure
198+
.input(z.object({ runtimeId: z.string() }))
199+
.mutation(async ({ input: { runtimeId } }) => {
200+
const socket = runtime2socket.get(runtimeId);
201+
if (!socket) return false;
202+
socket.send(
203+
JSON.stringify({
204+
type: "interruptKernel",
205+
payload: {
206+
sessionId: runtimeId,
207+
},
208+
})
209+
);
210+
return true;
211+
}),
212+
requestKernelStatus: publicProcedure
213+
.input(z.object({ runtimeId: z.string() }))
214+
.mutation(async ({ input: { runtimeId } }) => {
215+
console.log("requestKernelStatus", runtimeId);
216+
const socket = runtime2socket.get(runtimeId);
217+
if (!socket) {
218+
console.log("WARN: socket not found");
219+
return false;
220+
}
221+
socket.send(
222+
JSON.stringify({
223+
type: "requestKernelStatus",
224+
payload: {
225+
sessionId: runtimeId,
226+
},
227+
})
228+
);
229+
return true;
230+
}),
231+
});
232+
233+
export const appRouter = router({
234+
spawner: spawnRouter, // put procedures under "post" namespace
235+
});
236+
237+
export type AppRouter = typeof appRouter;

apps/spawner/src/yjs-runtime.ts renamed to apps/api/src/yjs_runtime.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import * as Y from "yjs";
22
import WebSocket from "ws";
3-
import { ApolloClient, InMemoryCache, gql } from "@apollo/client";
43

54
export type PodResult = {
65
exec_count?: number;

apps/spawner/.gitignore

Lines changed: 0 additions & 3 deletions
This file was deleted.

apps/spawner/package.json

Lines changed: 0 additions & 47 deletions
This file was deleted.

apps/spawner/src/run-docker.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.

apps/spawner/src/run-native.ts

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)