Skip to content

Rteqs/plots df upload ldata #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 25, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions runtime/mount/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import plotly.io as pio
import plotly.io._json as pio_json
from duckdb import DuckDBPyConnection
from latch.ldata.path import LPath
from latch.registry.table import Table
from lplots import _inject
from lplots.reactive import Node, Signal, ctx
Expand Down Expand Up @@ -1084,6 +1085,42 @@ async def send_globals_summary(self) -> None:

await self.send({"type": "globals_summary", "summary": summary})

async def upload_ldata(
self,
*,
dst: str,
key: str,
viewer_id: str | None,
cell_id: str | None,
filename: str | None,
) -> None:
# todo(rteqs): figure out how to get value without going through reactive context
df = self.k_globals[key]

pagination_settings = self.lookup_pagination_settings(
cell_id=cell_id, viewer_id=viewer_id, source_key=key
)

if df is None or not isinstance(df, pd.DataFrame):
raise ValueError(
"viewer does not exists or variable is not a pandas DataFrame"
)

res = filter_and_sort(df=df, pagination_settings=pagination_settings)

tmp_dir = Path.home() / ".latch" / "plots"
tmp_dir.mkdir(parents=True, exist_ok=True)

name = filename if filename is not None else key
local_path = tmp_dir / f"./{name}.csv"
res.to_csv(local_path, index=False)

if not local_path.exists():
raise RuntimeError("unable to save dataframe to csv")

# todo(rteqs): stream directly to LData without temp file
LPath(dst).upload_from(local_path)

async def accept(self) -> None:
# print("[kernel] accept")
msg = await self.conn.recv()
Expand Down Expand Up @@ -1291,6 +1328,18 @@ async def accept(self) -> None:
await self.send_globals_summary()
return

if msg["type"] == "upload_ldata":
await self.upload_ldata(
dst=msg["dst"],
key=msg["key"],
viewer_id=msg.get("viewer_id"),
cell_id=msg.get("cell_id"),
filename=msg.get("filename"),
)

await self.send({"type": "upload_ldata"})
return


loop: asyncio.AbstractEventLoop | None = None
shutdown_requested = False
Expand Down