Skip to content

fix & enable spawner in desktop app; use trpc #566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
"codepod": "./build/bin.js"
},
"dependencies": {
"@codepod/runtime": "workspace:^",
"@codepod/ui": "workspace:^",
"@codepod/yjs": "workspace:^",
"@trpc/server": "^10.43.0",
"commander": "^11.0.0",
"cors": "^2.8.5",
"express": "^4.18.2",
Expand All @@ -21,7 +24,8 @@
"lodash": "^4.17.21",
"ws": "^8.2.3",
"y-protocols": "^1.0.5",
"yjs": "^13.6.7"
"yjs": "^13.6.7",
"zod": "^3.22.4"
},
"devDependencies": {
"@types/express": "^4.17.14",
Expand Down
10 changes: 10 additions & 0 deletions apps/api/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import express from "express";
import http from "http";
import { WebSocketServer } from "ws";

import * as trpcExpress from "@trpc/server/adapters/express";

import { createSetupWSConnection } from "./yjs/yjs-setupWS";
import { bindState, writeState } from "./yjs-blob";

import cors from "cors";
import { appRouter } from "./trpc";

export async function startServer({ port, blobDir }) {
console.log("starting server ..");
Expand All @@ -18,6 +21,13 @@ export async function startServer({ port, blobDir }) {
console.log("html path: ", path);
app.use(express.static(path));

app.use(
"/trpc",
trpcExpress.createExpressMiddleware({
router: appRouter,
})
);

const http_server = http.createServer(app);

// Yjs websocket
Expand Down
File renamed without changes.
237 changes: 237 additions & 0 deletions apps/api/src/trpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import { initTRPC } from "@trpc/server";
const t = initTRPC.create();
export const router = t.router;
export const publicProcedure = t.procedure;

import Y from "yjs";
import WebSocket from "ws";
import { z } from "zod";

import { WebsocketProvider } from "@codepod/yjs/src/y-websocket";

import { killRuntime, spawnRuntime } from "./spawner_native";

import { connectSocket, runtime2socket, RuntimeInfo } from "./yjs_runtime";

// FIXME need to have a TTL to clear the ydoc.
const docs: Map<string, Y.Doc> = new Map();

// FIXME hard-coded yjs server url
const yjsServerUrl = `ws://localhost:4000/socket`;

async function getMyYDoc({ repoId }): Promise<Y.Doc> {
return new Promise((resolve, reject) => {
const oldydoc = docs.get(repoId);
if (oldydoc) {
resolve(oldydoc);
return;
}
const ydoc = new Y.Doc();
// connect to primary database
console.log("connecting to y-websocket provider", yjsServerUrl);
const provider = new WebsocketProvider(yjsServerUrl, repoId, ydoc, {
// resyncInterval: 2000,
//
// BC is more complex to track our custom Uploading status and SyncDone events.
disableBc: true,
params: {
role: "runtime",
},
// IMPORTANT: import websocket, because we're running it in node.js
WebSocketPolyfill: WebSocket as any,
});
provider.on("status", ({ status }) => {
console.log("provider status", status);
});
provider.once("synced", () => {
console.log("Provider synced");
docs.set(repoId, ydoc);
resolve(ydoc);
});
provider.connect();
});
}

const routingTable: Map<string, string> = new Map();

const spawnRouter = router({
spawnRuntime: publicProcedure
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
.mutation(async ({ input: { runtimeId, repoId } }) => {
console.log("spawnRuntime", runtimeId, repoId);
// create the runtime container
const wsUrl = await spawnRuntime(runtimeId);
console.log("Runtime spawned at", wsUrl);
routingTable.set(runtimeId, wsUrl);
// set initial runtimeMap info for this runtime
console.log("Loading yDoc ..");
const doc = await getMyYDoc({ repoId });
console.log("yDoc loaded");
const rootMap = doc.getMap("rootMap");
const runtimeMap = rootMap.get("runtimeMap") as Y.Map<RuntimeInfo>;
runtimeMap.set(runtimeId, {});
// console.log("=== runtimeMap", runtimeMap);
let values = Array.from(runtimeMap.values());
const keys = Array.from(runtimeMap.keys());
console.log("all runtimes", keys);
const nodesMap = rootMap.get("nodesMap") as Y.Map<any>;
const nodes = Array.from(nodesMap.values());
console.log("all nodes", nodes);
return true;
}),
killRuntime: publicProcedure
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
.mutation(async ({ input: { runtimeId, repoId } }) => {
await killRuntime(runtimeId);
console.log("Removing route ..");
// remove from runtimeMap
const doc = await getMyYDoc({ repoId });
const rootMap = doc.getMap("rootMap");
const runtimeMap = rootMap.get("runtimeMap") as Y.Map<RuntimeInfo>;
runtimeMap.delete(runtimeId);
routingTable.delete(runtimeId);
return true;
}),

connectRuntime: publicProcedure
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
.mutation(async ({ input: { runtimeId, repoId } }) => {
console.log("=== connectRuntime", runtimeId, repoId);
// assuming doc is already loaded.
// FIXME this socket/ is the prefix of url. This is very prone to errors.
const doc = await getMyYDoc({ repoId });
const rootMap = doc.getMap("rootMap");
console.log("rootMap", Array.from(rootMap.keys()));
const runtimeMap = rootMap.get("runtimeMap") as any;
const resultMap = rootMap.get("resultMap") as any;
await connectSocket({
runtimeId,
runtimeMap,
resultMap,
routingTable,
});
}),
disconnectRuntime: publicProcedure
.input(z.object({ runtimeId: z.string(), repoId: z.string() }))
.mutation(async ({ input: { runtimeId, repoId } }) => {
console.log("=== disconnectRuntime", runtimeId);
// get socket
const socket = runtime2socket.get(runtimeId);
if (socket) {
socket.close();
runtime2socket.delete(runtimeId);
}

const doc = await getMyYDoc({ repoId });
const rootMap = doc.getMap("rootMap");
const runtimeMap = rootMap.get("runtimeMap") as Y.Map<RuntimeInfo>;
runtimeMap.set(runtimeId, {});
}),
runCode: publicProcedure
.input(
z.object({
runtimeId: z.string(),
spec: z.object({ code: z.string(), podId: z.string() }),
})
)
.mutation(
async ({
input: {
runtimeId,
spec: { code, podId },
},
}) => {
console.log("runCode", runtimeId, podId);
const socket = runtime2socket.get(runtimeId);
if (!socket) return false;
// clear old results
// TODO move this to frontend, because it is hard to get ydoc in GraphQL handler.
//
// console.log("clear old result");
// console.log("old", resultMap.get(runtimeId));
// resultMap.set(podId, { data: [] });
// console.log("new", resultMap.get(runtimeId));
// console.log("send new result");
socket.send(
JSON.stringify({
type: "runCode",
payload: {
lang: "python",
code: code,
raw: true,
podId: podId,
sessionId: runtimeId,
},
})
);
return true;
}
),
runChain: publicProcedure
.input(
z.object({
runtimeId: z.string(),
specs: z.array(z.object({ code: z.string(), podId: z.string() })),
})
)
.mutation(async ({ input: { runtimeId, specs } }) => {
console.log("runChain", runtimeId);
const socket = runtime2socket.get(runtimeId);
if (!socket) return false;
specs.forEach(({ code, podId }) => {
socket.send(
JSON.stringify({
type: "runCode",
payload: {
lang: "python",
code: code,
raw: true,
podId: podId,
sessionId: runtimeId,
},
})
);
});
return true;
}),
interruptKernel: publicProcedure
.input(z.object({ runtimeId: z.string() }))
.mutation(async ({ input: { runtimeId } }) => {
const socket = runtime2socket.get(runtimeId);
if (!socket) return false;
socket.send(
JSON.stringify({
type: "interruptKernel",
payload: {
sessionId: runtimeId,
},
})
);
return true;
}),
requestKernelStatus: publicProcedure
.input(z.object({ runtimeId: z.string() }))
.mutation(async ({ input: { runtimeId } }) => {
console.log("requestKernelStatus", runtimeId);
const socket = runtime2socket.get(runtimeId);
if (!socket) {
console.log("WARN: socket not found");
return false;
}
socket.send(
JSON.stringify({
type: "requestKernelStatus",
payload: {
sessionId: runtimeId,
},
})
);
return true;
}),
});

export const appRouter = router({
spawner: spawnRouter, // put procedures under "post" namespace
});

export type AppRouter = typeof appRouter;
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as Y from "yjs";
import WebSocket from "ws";
import { ApolloClient, InMemoryCache, gql } from "@apollo/client";

export type PodResult = {
exec_count?: number;
Expand Down
3 changes: 0 additions & 3 deletions apps/spawner/.gitignore

This file was deleted.

47 changes: 0 additions & 47 deletions apps/spawner/package.json

This file was deleted.

16 changes: 0 additions & 16 deletions apps/spawner/src/run-docker.ts

This file was deleted.

15 changes: 0 additions & 15 deletions apps/spawner/src/run-native.ts

This file was deleted.

Loading