From c92a41b16c54935ca9422910be8677e76e6d7fa5 Mon Sep 17 00:00:00 2001 From: Hebi Li Date: Thu, 26 Oct 2023 17:24:12 -0700 Subject: [PATCH] fix & enable spawner; use trpc --- apps/api/package.json | 6 +- apps/api/src/server.ts | 10 + .../src/spawner_native.ts} | 0 apps/api/src/trpc.ts | 237 +++++++++++++++ .../yjs-runtime.ts => api/src/yjs_runtime.ts} | 1 - apps/spawner/.gitignore | 3 - apps/spawner/package.json | 47 --- apps/spawner/src/run-docker.ts | 16 - apps/spawner/src/run-native.ts | 15 - apps/spawner/src/server.ts | 279 ------------------ apps/spawner/src/spawner-docker.ts | 264 ----------------- apps/spawner/tsconfig.json | 26 -- apps/ui/package.json | 4 + apps/ui/src/App.tsx | 32 +- apps/ui/src/components/Canvas.tsx | 19 +- apps/ui/src/components/MyMonaco.tsx | 13 +- apps/ui/src/components/Sidebar.tsx | 94 ++---- apps/ui/src/components/nodes/Code.tsx | 30 +- apps/ui/src/components/nodes/Scope.tsx | 14 +- apps/ui/src/lib/store/repoSlice.ts | 11 +- apps/ui/src/lib/store/runtimeSlice.ts | 46 +-- apps/ui/src/lib/trpc.ts | 3 + apps/ui/src/pages/repo.tsx | 45 +-- pnpm-lock.yaml | 78 +++++ 24 files changed, 470 insertions(+), 823 deletions(-) rename apps/{spawner/src/spawner-native.ts => api/src/spawner_native.ts} (100%) create mode 100644 apps/api/src/trpc.ts rename apps/{spawner/src/yjs-runtime.ts => api/src/yjs_runtime.ts} (98%) delete mode 100644 apps/spawner/.gitignore delete mode 100644 apps/spawner/package.json delete mode 100644 apps/spawner/src/run-docker.ts delete mode 100644 apps/spawner/src/run-native.ts delete mode 100644 apps/spawner/src/server.ts delete mode 100644 apps/spawner/src/spawner-docker.ts delete mode 100644 apps/spawner/tsconfig.json create mode 100644 apps/ui/src/lib/trpc.ts diff --git a/apps/api/package.json b/apps/api/package.json index 57f1ef3e..bb3c683b 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -12,7 +12,10 @@ "codepod": "./build/bin.js" }, "dependencies": { + "@codepod/runtime": "workspace:^", "@codepod/ui": "workspace:^", + "@codepod/yjs": "workspace:^", + "@trpc/server": "^10.43.0", "commander": "^11.0.0", "cors": "^2.8.5", "express": "^4.18.2", @@ -21,7 +24,8 @@ "lodash": "^4.17.21", "ws": "^8.2.3", "y-protocols": "^1.0.5", - "yjs": "^13.6.7" + "yjs": "^13.6.7", + "zod": "^3.22.4" }, "devDependencies": { "@types/express": "^4.17.14", diff --git a/apps/api/src/server.ts b/apps/api/src/server.ts index 9853118b..a555d5af 100644 --- a/apps/api/src/server.ts +++ b/apps/api/src/server.ts @@ -2,10 +2,13 @@ import express from "express"; import http from "http"; import { WebSocketServer } from "ws"; +import * as trpcExpress from "@trpc/server/adapters/express"; + import { createSetupWSConnection } from "./yjs/yjs-setupWS"; import { bindState, writeState } from "./yjs-blob"; import cors from "cors"; +import { appRouter } from "./trpc"; export async function startServer({ port, blobDir }) { console.log("starting server .."); @@ -18,6 +21,13 @@ export async function startServer({ port, blobDir }) { console.log("html path: ", path); app.use(express.static(path)); + app.use( + "/trpc", + trpcExpress.createExpressMiddleware({ + router: appRouter, + }) + ); + const http_server = http.createServer(app); // Yjs websocket diff --git a/apps/spawner/src/spawner-native.ts b/apps/api/src/spawner_native.ts similarity index 100% rename from apps/spawner/src/spawner-native.ts rename to apps/api/src/spawner_native.ts diff --git a/apps/api/src/trpc.ts b/apps/api/src/trpc.ts new file mode 100644 index 00000000..b928bda9 --- /dev/null +++ b/apps/api/src/trpc.ts @@ -0,0 +1,237 @@ +import { initTRPC } from "@trpc/server"; +const t = initTRPC.create(); +export const router = t.router; +export const publicProcedure = t.procedure; + +import Y from "yjs"; +import WebSocket from "ws"; +import { z } from "zod"; + +import { WebsocketProvider } from "@codepod/yjs/src/y-websocket"; + +import { killRuntime, spawnRuntime } from "./spawner_native"; + +import { connectSocket, runtime2socket, RuntimeInfo } from "./yjs_runtime"; + +// FIXME need to have a TTL to clear the ydoc. +const docs: Map = new Map(); + +// FIXME hard-coded yjs server url +const yjsServerUrl = `ws://localhost:4000/socket`; + +async function getMyYDoc({ repoId }): Promise { + return new Promise((resolve, reject) => { + const oldydoc = docs.get(repoId); + if (oldydoc) { + resolve(oldydoc); + return; + } + const ydoc = new Y.Doc(); + // connect to primary database + console.log("connecting to y-websocket provider", yjsServerUrl); + const provider = new WebsocketProvider(yjsServerUrl, repoId, ydoc, { + // resyncInterval: 2000, + // + // BC is more complex to track our custom Uploading status and SyncDone events. + disableBc: true, + params: { + role: "runtime", + }, + // IMPORTANT: import websocket, because we're running it in node.js + WebSocketPolyfill: WebSocket as any, + }); + provider.on("status", ({ status }) => { + console.log("provider status", status); + }); + provider.once("synced", () => { + console.log("Provider synced"); + docs.set(repoId, ydoc); + resolve(ydoc); + }); + provider.connect(); + }); +} + +const routingTable: Map = new Map(); + +const spawnRouter = router({ + spawnRuntime: publicProcedure + .input(z.object({ runtimeId: z.string(), repoId: z.string() })) + .mutation(async ({ input: { runtimeId, repoId } }) => { + console.log("spawnRuntime", runtimeId, repoId); + // create the runtime container + const wsUrl = await spawnRuntime(runtimeId); + console.log("Runtime spawned at", wsUrl); + routingTable.set(runtimeId, wsUrl); + // set initial runtimeMap info for this runtime + console.log("Loading yDoc .."); + const doc = await getMyYDoc({ repoId }); + console.log("yDoc loaded"); + const rootMap = doc.getMap("rootMap"); + const runtimeMap = rootMap.get("runtimeMap") as Y.Map; + runtimeMap.set(runtimeId, {}); + // console.log("=== runtimeMap", runtimeMap); + let values = Array.from(runtimeMap.values()); + const keys = Array.from(runtimeMap.keys()); + console.log("all runtimes", keys); + const nodesMap = rootMap.get("nodesMap") as Y.Map; + const nodes = Array.from(nodesMap.values()); + console.log("all nodes", nodes); + return true; + }), + killRuntime: publicProcedure + .input(z.object({ runtimeId: z.string(), repoId: z.string() })) + .mutation(async ({ input: { runtimeId, repoId } }) => { + await killRuntime(runtimeId); + console.log("Removing route .."); + // remove from runtimeMap + const doc = await getMyYDoc({ repoId }); + const rootMap = doc.getMap("rootMap"); + const runtimeMap = rootMap.get("runtimeMap") as Y.Map; + runtimeMap.delete(runtimeId); + routingTable.delete(runtimeId); + return true; + }), + + connectRuntime: publicProcedure + .input(z.object({ runtimeId: z.string(), repoId: z.string() })) + .mutation(async ({ input: { runtimeId, repoId } }) => { + console.log("=== connectRuntime", runtimeId, repoId); + // assuming doc is already loaded. + // FIXME this socket/ is the prefix of url. This is very prone to errors. + const doc = await getMyYDoc({ repoId }); + const rootMap = doc.getMap("rootMap"); + console.log("rootMap", Array.from(rootMap.keys())); + const runtimeMap = rootMap.get("runtimeMap") as any; + const resultMap = rootMap.get("resultMap") as any; + await connectSocket({ + runtimeId, + runtimeMap, + resultMap, + routingTable, + }); + }), + disconnectRuntime: publicProcedure + .input(z.object({ runtimeId: z.string(), repoId: z.string() })) + .mutation(async ({ input: { runtimeId, repoId } }) => { + console.log("=== disconnectRuntime", runtimeId); + // get socket + const socket = runtime2socket.get(runtimeId); + if (socket) { + socket.close(); + runtime2socket.delete(runtimeId); + } + + const doc = await getMyYDoc({ repoId }); + const rootMap = doc.getMap("rootMap"); + const runtimeMap = rootMap.get("runtimeMap") as Y.Map; + runtimeMap.set(runtimeId, {}); + }), + runCode: publicProcedure + .input( + z.object({ + runtimeId: z.string(), + spec: z.object({ code: z.string(), podId: z.string() }), + }) + ) + .mutation( + async ({ + input: { + runtimeId, + spec: { code, podId }, + }, + }) => { + console.log("runCode", runtimeId, podId); + const socket = runtime2socket.get(runtimeId); + if (!socket) return false; + // clear old results + // TODO move this to frontend, because it is hard to get ydoc in GraphQL handler. + // + // console.log("clear old result"); + // console.log("old", resultMap.get(runtimeId)); + // resultMap.set(podId, { data: [] }); + // console.log("new", resultMap.get(runtimeId)); + // console.log("send new result"); + socket.send( + JSON.stringify({ + type: "runCode", + payload: { + lang: "python", + code: code, + raw: true, + podId: podId, + sessionId: runtimeId, + }, + }) + ); + return true; + } + ), + runChain: publicProcedure + .input( + z.object({ + runtimeId: z.string(), + specs: z.array(z.object({ code: z.string(), podId: z.string() })), + }) + ) + .mutation(async ({ input: { runtimeId, specs } }) => { + console.log("runChain", runtimeId); + const socket = runtime2socket.get(runtimeId); + if (!socket) return false; + specs.forEach(({ code, podId }) => { + socket.send( + JSON.stringify({ + type: "runCode", + payload: { + lang: "python", + code: code, + raw: true, + podId: podId, + sessionId: runtimeId, + }, + }) + ); + }); + return true; + }), + interruptKernel: publicProcedure + .input(z.object({ runtimeId: z.string() })) + .mutation(async ({ input: { runtimeId } }) => { + const socket = runtime2socket.get(runtimeId); + if (!socket) return false; + socket.send( + JSON.stringify({ + type: "interruptKernel", + payload: { + sessionId: runtimeId, + }, + }) + ); + return true; + }), + requestKernelStatus: publicProcedure + .input(z.object({ runtimeId: z.string() })) + .mutation(async ({ input: { runtimeId } }) => { + console.log("requestKernelStatus", runtimeId); + const socket = runtime2socket.get(runtimeId); + if (!socket) { + console.log("WARN: socket not found"); + return false; + } + socket.send( + JSON.stringify({ + type: "requestKernelStatus", + payload: { + sessionId: runtimeId, + }, + }) + ); + return true; + }), +}); + +export const appRouter = router({ + spawner: spawnRouter, // put procedures under "post" namespace +}); + +export type AppRouter = typeof appRouter; diff --git a/apps/spawner/src/yjs-runtime.ts b/apps/api/src/yjs_runtime.ts similarity index 98% rename from apps/spawner/src/yjs-runtime.ts rename to apps/api/src/yjs_runtime.ts index 52b16c05..d621c601 100644 --- a/apps/spawner/src/yjs-runtime.ts +++ b/apps/api/src/yjs_runtime.ts @@ -1,6 +1,5 @@ import * as Y from "yjs"; import WebSocket from "ws"; -import { ApolloClient, InMemoryCache, gql } from "@apollo/client"; export type PodResult = { exec_count?: number; diff --git a/apps/spawner/.gitignore b/apps/spawner/.gitignore deleted file mode 100644 index 7e052379..00000000 --- a/apps/spawner/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -dist/ -conns/conn-*.json -prisma/dev.db \ No newline at end of file diff --git a/apps/spawner/package.json b/apps/spawner/package.json deleted file mode 100644 index d59f2513..00000000 --- a/apps/spawner/package.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "name": "@codepod/spawner", - "version": "1.0.0", - "license": "MIT", - "scripts": { - "build": "tsc", - "start": "node build/run.js", - "dev:native": "ts-node-dev src/run-native.ts", - "dev:docker": "ts-node-dev src/run-docker.ts", - "test": "jest --config jest.config.js" - }, - "dependencies": { - "@apollo/client": "^3.7.1", - "@codepod/prisma": "workspace:*", - "@codepod/runtime": "workspace:*", - "@codepod/yjs": "workspace:*", - "@kubernetes/client-node": "^0.17.1", - "apollo-server": "^3.5.0", - "apollo-server-core": "^3.10.3", - "apollo-server-express": "3.10.2", - "dockerode": "^3.3.1", - "express": "^4.18.2", - "graphql": "16.6.0", - "jest": "^29.0.3", - "jsdom": "^22.1.0", - "jsonwebtoken": "^8.5.1", - "lib0": "^0.2.83", - "lodash": "^4.17.21", - "prosemirror-model": "^1.19.3", - "prosemirror-view": "^1.31.7", - "reactflow": "^11.7.4", - "ws": "^8.2.3", - "y-prosemirror": "^1.2.1", - "y-protocols": "^1.0.5", - "yjs": "^13.6.7" - }, - "devDependencies": { - "@types/dockerode": "^3.3.11", - "@types/express": "^4.17.14", - "@types/jsdom": "^21.1.1", - "@types/jsonwebtoken": "^8.5.9", - "@types/node": "^18.11.2", - "@types/ws": "^8.5.3", - "ts-node-dev": "^2.0.0", - "typescript": "^4.4.4" - } -} diff --git a/apps/spawner/src/run-docker.ts b/apps/spawner/src/run-docker.ts deleted file mode 100644 index 6b9f2ff2..00000000 --- a/apps/spawner/src/run-docker.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { startAPIServer } from "./server"; - -import { killRuntime, spawnRuntime } from "./spawner-docker"; - -if (!process.env.JWT_SECRET) { - throw new Error("JWT_SECRET env variable is not set."); -} - -startAPIServer({ port: 4021, spawnRuntime, killRuntime }); - -// ts-node-dev might fail to restart. Force the exiting and restarting. Ref: -// https://github.com/wclr/ts-node-dev/issues/69#issuecomment-493675960 -process.on("SIGTERM", () => { - console.log("Received SIGTERM. Exiting..."); - process.exit(); -}); diff --git a/apps/spawner/src/run-native.ts b/apps/spawner/src/run-native.ts deleted file mode 100644 index a1753aee..00000000 --- a/apps/spawner/src/run-native.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { startAPIServer } from "./server"; -import { killRuntime, spawnRuntime } from "./spawner-native"; - -if (!process.env.JWT_SECRET) { - throw new Error("JWT_SECRET env variable is not set."); -} - -startAPIServer({ port: 4021, spawnRuntime, killRuntime }); - -// ts-node-dev might fail to restart. Force the exiting and restarting. Ref: -// https://github.com/wclr/ts-node-dev/issues/69#issuecomment-493675960 -process.on("SIGTERM", () => { - console.log("Received SIGTERM. Exiting..."); - process.exit(); -}); diff --git a/apps/spawner/src/server.ts b/apps/spawner/src/server.ts deleted file mode 100644 index 31900fab..00000000 --- a/apps/spawner/src/server.ts +++ /dev/null @@ -1,279 +0,0 @@ -import { WebSocketServer } from "ws"; - -import express from "express"; -import http from "http"; - -import jwt from "jsonwebtoken"; - -import { ApolloServer } from "apollo-server-express"; - -import { gql } from "apollo-server"; -import { ApolloServerPluginLandingPageLocalDefault } from "apollo-server-core"; - -import Y from "yjs"; -import { WebsocketProvider } from "@codepod/yjs/src/y-websocket"; -import WebSocket from "ws"; - -import { connectSocket, runtime2socket, RuntimeInfo } from "./yjs-runtime"; - -interface TokenInterface { - id: string; -} - -// This runtime server needs to maintain a connection to Yjs server - -// const yjsServerUrl = "ws://localhost:4010"; -// const yjsServerUrl = `ws://${process.env.YJS_SERVER_HOST}:${process.env.YJS_SERVER_PORT}` -if (!process.env.YJS_WS_URL) throw new Error("YJS_WS_URL is not set."); -const yjsServerUrl = process.env.YJS_WS_URL; - -// FIXME need to have a TTL to clear the ydoc. -const docs: Map = new Map(); - -async function getMyYDoc({ repoId, token }): Promise { - return new Promise((resolve, reject) => { - const oldydoc = docs.get(repoId); - if (oldydoc) { - resolve(oldydoc); - return; - } - const ydoc = new Y.Doc(); - // connect to primary database - console.log("connecting to y-websocket provider", yjsServerUrl); - const provider = new WebsocketProvider(yjsServerUrl, repoId, ydoc, { - // resyncInterval: 2000, - // - // BC is more complex to track our custom Uploading status and SyncDone events. - disableBc: true, - params: { - token, - role: "runtime", - }, - // IMPORTANT: import websocket, because we're running it in node.js - WebSocketPolyfill: WebSocket as any, - }); - provider.on("status", ({ status }) => { - console.log("provider status", status); - }); - provider.once("synced", () => { - console.log("Provider synced"); - docs.set(repoId, ydoc); - resolve(ydoc); - }); - provider.connect(); - }); -} - -const routingTable: Map = new Map(); - -export async function startAPIServer({ port, spawnRuntime, killRuntime }) { - const apollo = new ApolloServer({ - context: ({ req }) => { - const token = req?.headers?.authorization?.slice(7); - let userId; - - if (token) { - const decoded = jwt.verify( - token, - process.env.JWT_SECRET as string - ) as TokenInterface; - userId = decoded.id; - } - return { - userId, - token, - }; - }, - typeDefs: gql` - type RouteInfo { - url: String - lastActive: String - } - - input RunSpecInput { - code: String - podId: String - } - - type Query { - hello: String - } - - type Mutation { - spawnRuntime(runtimeId: String, repoId: String): Boolean - killRuntime(runtimeId: String, repoId: String): Boolean - - connectRuntime(runtimeId: String, repoId: String): Boolean - disconnectRuntime(runtimeId: String, repoId: String): Boolean - runCode(runtimeId: String, spec: RunSpecInput): Boolean - runChain(runtimeId: String, specs: [RunSpecInput]): Boolean - interruptKernel(runtimeId: String): Boolean - requestKernelStatus(runtimeId: String): Boolean - } - `, - resolvers: { - Query: {}, - Mutation: { - spawnRuntime: async (_, { runtimeId, repoId }, { token, userId }) => { - // TODO verify repoId is owned by userId - if (!userId) throw new Error("Not authorized."); - // create the runtime container - const wsUrl = await spawnRuntime(runtimeId); - console.log("Runtime spawned at", wsUrl); - routingTable.set(runtimeId, wsUrl); - // set initial runtimeMap info for this runtime - console.log("Loading yDoc .."); - const doc = await getMyYDoc({ repoId, token }); - console.log("yDoc loaded"); - const rootMap = doc.getMap("rootMap"); - const runtimeMap = rootMap.get("runtimeMap") as Y.Map; - runtimeMap.set(runtimeId, {}); - return true; - }, - killRuntime: async (_, { runtimeId, repoId }, { token, userId }) => { - if (!userId) throw new Error("Not authorized."); - await killRuntime(runtimeId); - console.log("Removing route .."); - // remove from runtimeMap - const doc = await getMyYDoc({ repoId, token }); - const rootMap = doc.getMap("rootMap"); - const runtimeMap = rootMap.get("runtimeMap") as Y.Map; - runtimeMap.delete(runtimeId); - routingTable.delete(runtimeId); - return true; - }, - - connectRuntime: async (_, { runtimeId, repoId }, { token, userId }) => { - if (!userId) throw new Error("Not authorized."); - console.log("=== connectRuntime", runtimeId, repoId); - // assuming doc is already loaded. - // FIXME this socket/ is the prefix of url. This is very prone to errors. - const doc = await getMyYDoc({ repoId, token }); - const rootMap = doc.getMap("rootMap"); - console.log("rootMap", Array.from(rootMap.keys())); - const runtimeMap = rootMap.get("runtimeMap") as any; - const resultMap = rootMap.get("resultMap") as any; - await connectSocket({ - runtimeId, - runtimeMap, - resultMap, - routingTable, - }); - }, - disconnectRuntime: async ( - _, - { runtimeId, repoId }, - { token, userId } - ) => { - if (!userId) throw new Error("Not authorized."); - console.log("=== disconnectRuntime", runtimeId); - // get socket - const socket = runtime2socket.get(runtimeId); - if (socket) { - socket.close(); - runtime2socket.delete(runtimeId); - } - - const doc = await getMyYDoc({ repoId, token }); - const rootMap = doc.getMap("rootMap"); - const runtimeMap = rootMap.get("runtimeMap") as Y.Map; - runtimeMap.set(runtimeId, {}); - }, - runCode: async ( - _, - { runtimeId, spec: { code, podId } }, - { userId } - ) => { - if (!userId) throw new Error("Not authorized."); - console.log("runCode", runtimeId, podId); - const socket = runtime2socket.get(runtimeId); - if (!socket) return false; - // clear old results - // TODO move this to frontend, because it is hard to get ydoc in GraphQL handler. - // - // console.log("clear old result"); - // console.log("old", resultMap.get(runtimeId)); - // resultMap.set(podId, { data: [] }); - // console.log("new", resultMap.get(runtimeId)); - // console.log("send new result"); - socket.send( - JSON.stringify({ - type: "runCode", - payload: { - lang: "python", - code: code, - raw: true, - podId: podId, - sessionId: runtimeId, - }, - }) - ); - return true; - }, - runChain: async (_, { runtimeId, specs }, { userId }) => { - if (!userId) throw new Error("Not authorized."); - console.log("runChain", runtimeId, specs.podId); - const socket = runtime2socket.get(runtimeId); - if (!socket) return false; - specs.forEach(({ code, podId }) => { - socket.send( - JSON.stringify({ - type: "runCode", - payload: { - lang: "python", - code: code, - raw: true, - podId: podId, - sessionId: runtimeId, - }, - }) - ); - }); - return true; - }, - interruptKernel: async (_, { runtimeId }, { userId }) => { - if (!userId) throw new Error("Not authorized."); - const socket = runtime2socket.get(runtimeId); - if (!socket) return false; - socket.send( - JSON.stringify({ - type: "interruptKernel", - payload: { - sessionId: runtimeId, - }, - }) - ); - return true; - }, - requestKernelStatus: async (_, { runtimeId }, { userId }) => { - if (!userId) throw new Error("Not authorized."); - console.log("requestKernelStatus", runtimeId); - const socket = runtime2socket.get(runtimeId); - if (!socket) { - console.log("WARN: socket not found"); - return false; - } - socket.send( - JSON.stringify({ - type: "requestKernelStatus", - payload: { - sessionId: runtimeId, - }, - }) - ); - return true; - }, - }, - }, - plugins: [ApolloServerPluginLandingPageLocalDefault({ embed: true })], - }); - const expapp = express(); - const http_server = http.createServer(expapp); - // graphql api will be available at /graphql - - await apollo.start(); - apollo.applyMiddleware({ app: expapp }); - http_server.listen({ port }, () => { - console.log(`🚀 API server ready at http://localhost:${port}`); - }); -} diff --git a/apps/spawner/src/spawner-docker.ts b/apps/spawner/src/spawner-docker.ts deleted file mode 100644 index c36e99e9..00000000 --- a/apps/spawner/src/spawner-docker.ts +++ /dev/null @@ -1,264 +0,0 @@ -import Docker from "dockerode"; - -import { ApolloClient, InMemoryCache, gql } from "@apollo/client/core"; - -async function removeContainer(name) { - return new Promise((resolve, reject) => { - var docker = new Docker(); - console.log("remove if already exist"); - let old = docker.getContainer(name); - old.inspect((err, data) => { - if (err) { - console.log("removeContainer: container seems not exist."); - return resolve(null); - } - if (data?.State.Running) { - old.stop((err, data) => { - // FIXME If the container is stopped but not removed, will there be errors - // if I call stop? - if (err) { - // console.log("ERR:", err); - // console.log("No such container, resolving .."); - // reject(); - console.log("No such container running. Returning."); - return resolve(null); - } - console.log("Stopped. Removing .."); - old.remove((err, data) => { - if (err) { - console.log("ERR during removing container:", err); - return reject("ERROR!!!"); - // resolve(); - } - console.log("removed successfully"); - return resolve(null); - }); - }); - } else { - console.log("Already stopped. Removing .."); - old.remove((err, data) => { - if (err) { - console.log("ERR during removing container:", err); - return reject("ERROR!!!"); - // resolve(); - } - console.log("removed successfully"); - return resolve(null); - }); - } - }); - }); -} - -/** - * Load or create a docker container. - * @param image image name - * @param name name of container - * @param network which docker network to use - * @param Env additional optional env for the container - * @returns Boolean for whether a new container is created. - */ -async function loadOrCreateContainer(spec, network) { - console.log("loading container", spec.name); - let ip = await loadContainer(spec.name, network); - if (ip) return false; - console.log("beforing creating container, removing just in case .."); - await removeContainer(spec.name); - console.log("creating container .."); - await createContainer(spec, network); - return true; -} - -async function getContainerInfo(name): Promise { - return new Promise((resolve, reject) => { - var docker = new Docker(); - let container = docker.getContainer(name); - container.inspect((err, data) => { - if (err) { - console.log("getContainerInfo: container seems not exist."); - return resolve(null); - } - if (data?.State.Running) { - return resolve(data.State.StartedAt); - } - return resolve(null); - }); - }); -} - -async function loadContainer(name, network) { - // if already exists, just return the IP - // else, create and return the IP - return new Promise((resolve, reject) => { - var docker = new Docker(); - console.log("remove if already exist"); - let old = docker.getContainer(name); - old.inspect((err, data) => { - if (err) { - console.log("removeContainer: container seems not exist."); - return resolve(null); - } - if (data?.State.Running) { - // console.log(data.NetworkSettings.Networks); - let ip = data.NetworkSettings.Networks[network].IPAddress; - console.log("IP:", ip); - resolve(ip); - } else { - console.log("Already stopped. Removing .."); - old.remove((err, data) => { - if (err) { - console.log("ERR during removing container:", err); - return reject("ERROR!!!"); - // resolve(); - } - console.log("removed successfully"); - return resolve(null); - }); - } - }); - }); -} - -// return promise of IP address -async function createContainer(spec, network) { - return new Promise((resolve, reject) => { - var docker = new Docker(); - // spawn("docker", ["run", "-d", "jp-julia"]); - // 1. first check if the container already there. If so, stop and delete - // let name = "julia_kernel_X"; - console.log("spawning kernel in container .."); - docker.createContainer(spec, (err, container) => { - if (err) { - console.log("ERR:", err); - return; - } - container?.start((err, data) => { - console.log("Container started!"); - // console.log(container); - container.inspect((err, data) => { - // console.log("inspect"); - // let ip = data.NetworkSettings.IPAddress - // - // If created using codepod network bridge, the IP is here: - console.log(data?.NetworkSettings.Networks); - let ip = data?.NetworkSettings.Networks[network].IPAddress; - if (!ip) { - console.log( - "ERROR: IP not available. All network", - data?.NetworkSettings.Networks - ); - resolve(null); - } else { - console.log("IP:", ip); - resolve(ip); - } - }); - // console.log("IPaddress:", container.NetworkSettings.IPAddress) - }); - }); - }); -} - -export async function scanRunningSessions(): Promise { - return new Promise((resolve, reject) => { - var docker = new Docker(); - docker.listContainers((err, containers) => { - if (err) { - console.log("ERR:", err); - return; - } - let sessionIds = containers - ?.filter( - (c) => c.Names[0].startsWith("/cpkernel_") && c.State === "running" - ) - .map((c) => c.Names[0].substring("/cpkernel_".length)); - return resolve(sessionIds || []); - }); - }); -} - -/** - * - * @returns target url: ws://container:port - */ -export async function spawnRuntime(sessionId) { - // launch the kernel - console.log("Spawning "); - let url = `/${sessionId}`; - console.log("spawning kernel"); - let zmq_host = `cpkernel_${sessionId}`; - await loadOrCreateContainer( - { - Image: process.env.ZMQ_KERNEL_IMAGE, - name: zmq_host, - HostConfig: { - NetworkMode: "codepod", - Binds: [ - "dotjulia:/root/.julia", - "pipcache:/root/.cache/pip", - // FIXME hard coded dev_ prefix - "dev_shared_vol:/mount/shared", - ], - }, - }, - "codepod" - ); - console.log("spawning ws"); - let ws_host = `cpruntime_${sessionId}`; - if (process.env.PROJECT_ROOT) { - await loadOrCreateContainer( - { - Image: "node:18", - name: ws_host, - WorkingDir: "/app", - Cmd: ["sh", "-c", "yarn && yarn dev"], - Env: [`ZMQ_HOST=${zmq_host}`], - HostConfig: { - NetworkMode: "codepod", - Binds: [ - `${process.env.PROJECT_ROOT}/runtime:/app`, - "runtime-node-modules:/app/node_modules", - ], - }, - }, - "codepod" - ); - } else { - await loadOrCreateContainer( - { - Image: process.env.WS_RUNTIME_IMAGE, - name: ws_host, - Env: [`ZMQ_HOST=${zmq_host}`], - HostConfig: { - NetworkMode: "codepod", - }, - }, - "codepod" - ); - } - // This 4020 is the WS listening port in WS_RUNTIME_IMAGE - return `ws://${ws_host}:4020`; -} - -export async function killRuntime(sessionId) { - if (!sessionId) return false; - // TODO kill the runtime server. - // FIXME handle exception, and kill zombie containers - let url = `/${sessionId!}`; - let zmq_host = `cpkernel_${sessionId}`; - try { - await removeContainer(zmq_host); - } catch (e) { - console.log("Error removing container", zmq_host, e); - return false; - } - let ws_host = `cpruntime_${sessionId}`; - try { - await removeContainer(ws_host); - } catch (e) { - console.log("Error removing container", ws_host, e); - return false; - } - // remote route - return true; -} diff --git a/apps/spawner/tsconfig.json b/apps/spawner/tsconfig.json deleted file mode 100644 index a8a21c18..00000000 --- a/apps/spawner/tsconfig.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "compilerOptions": { - "module": "commonjs", - "outDir": "./build", - "target": "esnext", - "allowJs": true, - "allowSyntheticDefaultImports": true, - "esModuleInterop": true, - "lib": ["esnext"], - "moduleResolution": "node", - "noFallthroughCasesInSwitch": true, - "resolveJsonModule": true, - "skipLibCheck": true, - "strict": true, - "isolatedModules": true, - "noImplicitAny": false - }, - "ts-node": { - // these options are overrides used only by ts-node - // same as the --compilerOptions flag and the TS_NODE_COMPILER_OPTIONS environment variable - "compilerOptions": { - "module": "commonjs" - } - }, - "include": ["src"] -} diff --git a/apps/ui/package.json b/apps/ui/package.json index 12e50e21..360765a2 100644 --- a/apps/ui/package.json +++ b/apps/ui/package.json @@ -27,6 +27,10 @@ "@remirror/react": "^2.0.28", "@remirror/react-core": "^2.0.21", "@remirror/theme": "^2.0.9", + "@tanstack/react-query": "^4.18.0", + "@trpc/client": "^10.43.0", + "@trpc/react-query": "^10.43.0", + "@trpc/server": "^10.43.0", "ansi-to-react": "^6.1.6", "d3-force": "^3.0.0", "d3-quadtree": "^3.0.1", diff --git a/apps/ui/src/App.tsx b/apps/ui/src/App.tsx index 205c57fc..48b59189 100644 --- a/apps/ui/src/App.tsx +++ b/apps/ui/src/App.tsx @@ -21,6 +21,30 @@ import Box from "@mui/material/Box"; import { SnackbarProvider } from "notistack"; import { Typography } from "@mui/material"; +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import { httpBatchLink } from "@trpc/client"; +import React, { useState } from "react"; + +import { trpc } from "./lib/trpc"; + +export function TrpcProvider({ children }) { + const [queryClient] = useState(() => new QueryClient()); + const [trpcClient] = useState(() => + trpc.createClient({ + links: [ + httpBatchLink({ + url: "http://localhost:4000/trpc", + }), + ], + }) + ); + return ( + + {children} + + ); +} + // the url should be ws://:/socket let yjsWsUrl; @@ -57,9 +81,11 @@ export default function App() { return ( - - - + + + + + ); diff --git a/apps/ui/src/components/Canvas.tsx b/apps/ui/src/components/Canvas.tsx index a0b6f342..38f74240 100644 --- a/apps/ui/src/components/Canvas.tsx +++ b/apps/ui/src/components/Canvas.tsx @@ -49,6 +49,7 @@ import CustomConnectionLine from "./nodes/CustomConnectionLine"; import HelperLines from "./HelperLines"; import { getAbsPos, newNodeShapeConfig } from "../lib/store/canvasSlice"; import { useApolloClient } from "@apollo/client"; +import { trpc } from "../lib/trpc"; const nodeTypes = { SCOPE: ScopeNode, CODE: CodeNode, RICH: RichNode }; const edgeTypes = { @@ -149,8 +150,11 @@ function useJump() { const resetSelection = useStore(store, (state) => state.resetSelection); const selectPod = useStore(store, (state) => state.selectPod); - const yjsRun = useStore(store, (state) => state.yjsRun); - const apolloClient = useApolloClient(); + const preprocessChain = useStore(store, (state) => state.preprocessChain); + const getScopeChain = useStore(store, (state) => state.getScopeChain); + + const runChain = trpc.spawner.runChain.useMutation(); + const activeRuntime = useStore(store, (state) => state.activeRuntime); const setCenterSelection = useStore( store, @@ -240,7 +244,10 @@ function useJump() { if (pod.type == "CODE") { if (event.shiftKey) { // Hitting "SHIFT"+"Enter" will run the code pod - yjsRun(id, apolloClient); + if (activeRuntime) { + const specs = preprocessChain([id]); + if (specs) runChain.mutate({ runtimeId: activeRuntime, specs }); + } } else { // Hitting "Enter" on a Code pod will go to "Edit" mode. setFocusedEditor(id); @@ -248,7 +255,11 @@ function useJump() { } else if (pod.type === "SCOPE") { if (event.shiftKey) { // Hitting "SHIFT"+"Enter" on a Scope will run the scope. - yjsRun(id, apolloClient); + if (activeRuntime) { + const chain = getScopeChain(id); + const specs = preprocessChain(chain); + if (specs) runChain.mutate({ runtimeId: activeRuntime, specs }); + } } } else if (pod.type === "RICH") { // Hitting "Enter" on a Rich pod will go to "Edit" mode. diff --git a/apps/ui/src/components/MyMonaco.tsx b/apps/ui/src/components/MyMonaco.tsx index 7959cb7a..c1ba2570 100644 --- a/apps/ui/src/components/MyMonaco.tsx +++ b/apps/ui/src/components/MyMonaco.tsx @@ -11,6 +11,7 @@ import { MonacoBinding } from "y-monaco"; import { useReactFlow } from "reactflow"; import { Annotation } from "../lib/parser"; import { useApolloClient } from "@apollo/client"; +import { trpc } from "../lib/trpc"; const theme: monaco.editor.IStandaloneThemeData = { base: "vs", @@ -391,8 +392,11 @@ export const MyMonaco = memo(function MyMonaco({ console.debug("[perf] rendering MyMonaco", id); const store = useContext(RepoContext)!; const showLineNumbers = useStore(store, (state) => state.showLineNumbers); - const yjsRun = useStore(store, (state) => state.yjsRun); - const apolloClient = useApolloClient(); + const preprocessChain = useStore(store, (state) => state.preprocessChain); + + const runChain = trpc.spawner.runChain.useMutation(); + const activeRuntime = useStore(store, (state) => state.activeRuntime); + const focusedEditor = useStore(store, (state) => state.focusedEditor); const setFocusedEditor = useStore(store, (state) => state.setFocusedEditor); const annotations = useStore( @@ -464,7 +468,10 @@ export const MyMonaco = memo(function MyMonaco({ label: "Run", keybindings: [monaco.KeyMod.Shift | monaco.KeyCode.Enter], run: () => { - yjsRun(id, apolloClient); + if (activeRuntime) { + const specs = preprocessChain([id]); + if (specs) runChain.mutate({ runtimeId: activeRuntime, specs }); + } }, }); editor.addAction({ diff --git a/apps/ui/src/components/Sidebar.tsx b/apps/ui/src/components/Sidebar.tsx index 345de17c..22f4bc03 100644 --- a/apps/ui/src/components/Sidebar.tsx +++ b/apps/ui/src/components/Sidebar.tsx @@ -27,11 +27,6 @@ import TreeItem from "@mui/lab/TreeItem"; import { useSnackbar, VariantType } from "notistack"; -import { Node as ReactflowNode } from "reactflow"; -import { NodeData } from "../lib/store/canvasSlice"; -import * as Y from "yjs"; - -import { gql, useQuery, useMutation, useApolloClient } from "@apollo/client"; import { useStore } from "zustand"; import { MyKBar } from "./MyKBar"; @@ -56,6 +51,8 @@ import { getUpTime, myNanoId } from "../lib/utils/utils"; import { toSvg } from "html-to-image"; import { match } from "ts-pattern"; +import { trpc } from "../lib/trpc"; + function SidebarSettings() { const store = useContext(RepoContext)!; const scopedVars = useStore(store, (state) => state.scopedVars); @@ -523,23 +520,8 @@ const RuntimeMoreMenu = ({ runtimeId }) => { const runtimeMap = useStore(store, (state) => state.getRuntimeMap()); const repoId = useStore(store, (state) => state.repoId); - const [killRuntime] = useMutation( - gql` - mutation KillRuntime($runtimeId: String, $repoId: String) { - killRuntime(runtimeId: $runtimeId, repoId: $repoId) - } - `, - { context: { clientName: "spawner" } } - ); - - const [disconnectRuntime] = useMutation( - gql` - mutation DisconnectRuntime($runtimeId: String, $repoId: String) { - disconnectRuntime(runtimeId: $runtimeId, repoId: $repoId) - } - `, - { context: { clientName: "spawner" } } - ); + const killRuntime = trpc.spawner.killRuntime.useMutation(); + const disconnectRuntime = trpc.spawner.disconnectRuntime.useMutation(); return ( @@ -573,7 +555,7 @@ const RuntimeMoreMenu = ({ runtimeId }) => { { - disconnectRuntime({ variables: { runtimeId, repoId } }); + disconnectRuntime.mutate({ runtimeId, repoId }); handleClose(); }} > @@ -581,7 +563,7 @@ const RuntimeMoreMenu = ({ runtimeId }) => { { - killRuntime({ variables: { runtimeId, repoId: repoId } }); + killRuntime.mutate({ runtimeId, repoId: repoId }); handleClose(); }} > @@ -602,30 +584,10 @@ const RuntimeItem = ({ runtimeId }) => { const activeRuntime = useStore(store, (state) => state.activeRuntime); const runtime = runtimeMap.get(runtimeId)!; const repoId = useStore(store, (state) => state.repoId); - const [connect] = useMutation( - gql` - mutation ConnectRuntime($runtimeId: String, $repoId: String) { - connectRuntime(runtimeId: $runtimeId, repoId: $repoId) - } - `, - { context: { clientName: "spawner" } } - ); - const [requestKernelStatus] = useMutation( - gql` - mutation RequestKernelStatus($runtimeId: String) { - requestKernelStatus(runtimeId: $runtimeId) - } - `, - { context: { clientName: "spawner" } } - ); - const [interruptKernel] = useMutation( - gql` - mutation InterruptKernel($runtimeId: String) { - interruptKernel(runtimeId: $runtimeId) - } - `, - { context: { clientName: "spawner" } } - ); + + const connect = trpc.spawner.connectRuntime.useMutation(); + const requestKernelStatus = trpc.spawner.requestKernelStatus.useMutation(); + const interruptKernel = trpc.spawner.interruptKernel.useMutation(); useEffect(() => { // if the runtime is disconnected, keep trying to connect. @@ -633,11 +595,9 @@ const RuntimeItem = ({ runtimeId }) => { const interval = setInterval( () => { console.log("try connecting to runtime", runtimeId); - connect({ - variables: { - runtimeId, - repoId, - }, + connect.mutate({ + runtimeId, + repoId, }); }, // ping every 3 seconds @@ -695,10 +655,8 @@ const RuntimeItem = ({ runtimeId }) => { { - requestKernelStatus({ - variables: { - runtimeId, - }, + requestKernelStatus.mutate({ + runtimeId, }); }} > @@ -708,10 +666,8 @@ const RuntimeItem = ({ runtimeId }) => { { - interruptKernel({ - variables: { - runtimeId, - }, + interruptKernel.mutate({ + runtimeId, }); }} > @@ -724,28 +680,22 @@ const RuntimeItem = ({ runtimeId }) => { ); }; -const YjsRuntimeStatus = () => { +const RuntimeStatus = () => { const store = useContext(RepoContext)!; const repoId = useStore(store, (state) => state.repoId); const runtimeMap = useStore(store, (state) => state.getRuntimeMap()); // Observe runtime change const runtimeChanged = useStore(store, (state) => state.runtimeChanged); const ids = Array.from(runtimeMap.keys()); - const [spawnRuntime] = useMutation( - gql` - mutation SpawnRuntime($runtimeId: String, $repoId: String) { - spawnRuntime(runtimeId: $runtimeId, repoId: $repoId) - } - `, - { context: { clientName: "spawner" } } - ); + const spawnRuntime = trpc.spawner.spawnRuntime.useMutation(); + return ( <> Runtime - {/* The header. */} - + {/* The sidebar */} state.setEditMode); setEditMode("edit"); @@ -385,20 +358,14 @@ function WaitForProvider({ children, yjsWsUrl }) { } export function Repo({ yjsWsUrl }) { - let { id } = useParams(); const store = useRef(createRepoStore()).current; - const setRepo = useStore(store, (state) => state.setRepo); - // console.log("load store", useRef(createRepoStore())); - useEffect(() => { - setRepo(id!); - }, []); return ( - + - + = 10'} dev: false + /@trpc/client@10.43.0(@trpc/server@10.43.0): + resolution: {integrity: sha512-8LbSpPHmIseb/Ke+GzL45y0itkKunGQWfxqHf2uy69RSRvER0vj+Gu67L8YD86FBgc/nsX/6GBuJiUet5lIDIw==} + peerDependencies: + '@trpc/server': 10.43.0 + dependencies: + '@trpc/server': 10.43.0 + dev: false + + /@trpc/react-query@10.43.0(@tanstack/react-query@4.18.0)(@trpc/client@10.43.0)(@trpc/server@10.43.0)(react-dom@18.2.0)(react@18.2.0): + resolution: {integrity: sha512-5+pBnnV9QqnwpO5Li9T60hXHylV1USEjFkRjrnd7mJul/t2dKxs5ouv+YDRnOJ36oD9wDki5lBGk2sOgFrzGUw==} + peerDependencies: + '@tanstack/react-query': ^4.18.0 + '@trpc/client': 10.43.0 + '@trpc/server': 10.43.0 + react: '>=16.8.0' + react-dom: '>=16.8.0' + dependencies: + '@tanstack/react-query': 4.18.0(react-dom@18.2.0)(react@18.2.0) + '@trpc/client': 10.43.0(@trpc/server@10.43.0) + '@trpc/server': 10.43.0 + react: 18.2.0 + react-dom: 18.2.0(react@18.2.0) + dev: false + + /@trpc/server@10.43.0: + resolution: {integrity: sha512-1/h9KCPkTNNmpN5VKfKO4kPcl/W4Y9VQla4YGg4pydSh/+4b//0IPfvk3Oz4tz/tvWyAUlBKkBVhD3GfDLcAQA==} + dev: false + /@tsconfig/node10@1.0.9: resolution: {integrity: sha512-jNsYVVxU8v5g43Erja32laIDHXeoNvFEpX33OK4d6hljo3jDhCBDhx5dhCCTMWUojscpAagGiRkBKxpdl9fxqA==} dev: true @@ -10426,6 +10500,10 @@ packages: node-gyp-build: 4.6.1 dev: false + /zod@3.22.4: + resolution: {integrity: sha512-iC+8Io04lddc+mVqQ9AZ7OQ2MrUKGN+oIQyq1vemgt46jwCwLfhq7/pwnBnNXXXZb8VTVLKwp9EDkx+ryxIWmg==} + dev: false + /zustand@4.4.1(@types/react@18.2.15)(immer@10.0.2)(react@18.2.0): resolution: {integrity: sha512-QCPfstAS4EBiTQzlaGP1gmorkh/UL1Leaj2tdj+zZCZ/9bm0WS7sI2wnfD5lpOszFqWJ1DcPnGoY8RDL61uokw==} engines: {node: '>=12.7.0'}