Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/slack_search/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies = [
"fastmcp>=2.0",
"httpx>=0.28",
"pydantic>=2.0",
"pydantic-settings>=2.0",
]

classifiers = [
Expand Down
157 changes: 157 additions & 0 deletions examples/slack_search/scripts/normalize_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Migration script to normalize topic labels in Turso."""

import json
import os

import httpx
from dotenv import load_dotenv

load_dotenv()

NORMALIZATION_MAP: dict[str, str] = {
# deployments (general)
"prefect 3.x deployment": "deployments",
"prefect deployment": "deployments",
"prefect 3.x deployments": "deployments",
"prefect deployments": "deployments",
"flow deployment": "deployments",
# prefect.yaml
"prefect.yaml configuration": "prefect.yaml",
"prefect yaml configuration": "prefect.yaml",
# environment variables
"environment variable configuration": "environment variables",
# concurrency (general)
"concurrency limits": "concurrency",
"prefect 3.x concurrency limits": "concurrency",
"concurrency management": "concurrency",
"prefect 3.x concurrency management": "concurrency",
# work pools
"work pools and workers": "work pools",
"prefect 3.x work pools": "work pools",
# kubernetes
"kubernetes deployment": "kubernetes",
# automations
"prefect 3.x automations": "automations",
# prefect version tags (remove version specificity)
"prefect 3.x": "prefect 3.x", # keep this one as canonical
"prefect 2.x": "prefect 2.x", # keep this one as canonical
}


def get_turso_client() -> tuple[str, str]:
turso_url = os.environ.get("TURSO_URL", "").strip().strip('"')
turso_token = os.environ.get("TURSO_TOKEN", "").strip().strip('"')
if turso_url.startswith("libsql://"):
turso_url = turso_url[len("libsql://") :]
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function doesn't validate that host and token are non-empty before returning. If environment variables are not set, this will return empty strings and cause cryptic errors later. Consider adding validation to raise a clear error message if credentials are missing.

Suggested change
turso_url = turso_url[len("libsql://") :]
turso_url = turso_url[len("libsql://") :]
missing = []
if not turso_url:
missing.append("TURSO_URL")
if not turso_token:
missing.append("TURSO_TOKEN")
if missing:
raise RuntimeError(
f"Missing required environment variable(s): {', '.join(missing)}. "
"Please set them before running this script."
)

Copilot uses AI. Check for mistakes.
return turso_url, turso_token


def turso_query(host: str, token: str, sql: str, args: list | None = None) -> list[dict]:
stmt: dict = {"sql": sql}
if args:
stmt["args"] = [{"type": "text", "value": str(a)} for a in args]

resp = httpx.post(
f"https://{host}/v2/pipeline",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]},
timeout=60,
)
resp.raise_for_status()
data = resp.json()

result = data["results"][0]
if result["type"] == "error":
raise Exception(f"Turso error: {result['error']}")

cols = [c["name"] for c in result["response"]["result"]["cols"]]
rows = result["response"]["result"]["rows"]

def extract(cell):
if cell is None:
return None
if isinstance(cell, dict):
return cell.get("value")
return cell

return [dict(zip(cols, [extract(c) for c in row])) for row in rows]


def normalize_topics(topics: list[str]) -> list[str]:
"""Normalize a list of topics, preserving order and deduping."""
seen = set()
result = []
for t in topics:
normalized = NORMALIZATION_MAP.get(t.lower().strip(), t.lower().strip())
if normalized not in seen:
seen.add(normalized)
result.append(normalized)
return result


def main(apply: bool = False):
host, token = get_turso_client()

# fetch all rows with key_topics
print("fetching rows with key_topics...")
rows = turso_query(
host,
token,
"""
SELECT key, metadata FROM assets
WHERE metadata LIKE '%key_topics%'
""",
)
print(f"found {len(rows)} rows")

updates = []
for row in rows:
key = row["key"]
meta_raw = row["metadata"]
if not meta_raw:
continue

meta = json.loads(meta_raw)
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The json.loads call could raise a JSONDecodeError if the metadata contains invalid JSON. This would cause the migration to fail completely. Consider wrapping this in a try-except block to skip malformed metadata and log a warning instead.

Copilot uses AI. Check for mistakes.
old_topics = meta.get("key_topics", [])
if not old_topics:
continue

new_topics = normalize_topics(old_topics)

if old_topics != new_topics:
meta["key_topics"] = new_topics
updates.append((key, json.dumps(meta)))

print(f"found {len(updates)} rows to update")

if not updates:
print("nothing to do")
return

# preview first 10
print("\npreview (first 10):")
for key, new_meta in updates[:10]:
meta = json.loads(new_meta)
print(f" {meta['key_topics']}")

if not apply:
print("\ndry run - pass --apply to execute updates")
return

# execute updates
print(f"\nupdating {len(updates)} rows...")
for i, (key, new_meta) in enumerate(updates):
turso_query(host, token, "UPDATE assets SET metadata = ? WHERE key = ?", [new_meta, key])
print(f"\r {i + 1}/{len(updates)}", end="", flush=True)
print()

print(f"done - updated {len(updates)} rows")


if __name__ == "__main__":
import sys

main(apply="--apply" in sys.argv)
65 changes: 60 additions & 5 deletions examples/slack_search/src/slack_search/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,32 @@ class ThreadSummary(BaseModel):
@property
def channel_id(self) -> str:
"""Extract channel ID from key."""
# key format: slack://workspace/bot/BOT_ID/summary/CHANNEL_ID/THREAD_TS
# key: slack://workspace/bot/BOT_ID/summary/CHANNEL_ID/THREAD_TS
# idx: 0 1 2 3 4 5 6 7
parts = self.key.split("/")
if len(parts) >= 6:
return parts[5]
if len(parts) >= 7:
return parts[6]
return ""

@computed_field
@property
def thread_ts(self) -> str:
"""Extract thread timestamp from key."""
parts = self.key.split("/")
if len(parts) >= 7:
return parts[6]
if len(parts) >= 8:
return parts[7]
return ""

@computed_field
@property
def url(self) -> str:
"""Slack thread URL."""
parts = self.key.split("/")
if len(parts) >= 8:
workspace = parts[2]
channel = parts[6]
ts = parts[7].replace(".", "")
return f"https://{workspace}.slack.com/archives/{channel}/p{ts}"
return ""


Expand Down Expand Up @@ -89,6 +102,48 @@ def workspace(self) -> str:
"""Slack workspace name."""
return self.metadata.get("workspace_name", "")

@computed_field
@property
def url(self) -> str:
"""Slack thread URL."""
if self.channel_id and self.thread_ts:
ws = self.workspace or "prefect-community"
ts = self.thread_ts.replace(".", "")
return f"https://{ws}.slack.com/archives/{self.channel_id}/p{ts}"
return ""


class SlackMessage(BaseModel):
"""A message from a Slack thread."""

user: str = ""
text: str = ""
ts: str = ""

@computed_field
@property
def timestamp(self) -> str:
"""Human-readable timestamp."""
if self.ts:
from datetime import datetime

try:
dt = datetime.fromtimestamp(float(self.ts))
Comment on lines +128 to +131
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using datetime.fromtimestamp without specifying a timezone will use the local timezone, which can lead to inconsistent behavior across different environments. Consider using datetime.fromtimestamp with timezone awareness (e.g., tz=timezone.utc) or datetime.utcfromtimestamp for consistent UTC timestamps.

Suggested change
from datetime import datetime
try:
dt = datetime.fromtimestamp(float(self.ts))
from datetime import datetime, timezone
try:
dt = datetime.fromtimestamp(float(self.ts), tz=timezone.utc)

Copilot uses AI. Check for mistakes.
return dt.strftime("%Y-%m-%d %H:%M")
except (ValueError, OSError):
pass
return ""


class ThreadContent(BaseModel):
"""Full thread content from Slack API."""

channel_id: str
thread_ts: str
url: str
messages: list[SlackMessage]
message_count: int


class Stats(BaseModel):
"""Index statistics."""
Expand Down
97 changes: 72 additions & 25 deletions examples/slack_search/src/slack_search/client.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,68 @@
"""Turso client for slack thread search."""

import os
from functools import lru_cache
from typing import Any

import httpx
from pydantic import computed_field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

TURSO_URL = os.environ.get("TURSO_URL", "")
TURSO_TOKEN = os.environ.get("TURSO_TOKEN", "")
VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY", "")

class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")

def _get_turso_host() -> str:
"""Strip libsql:// prefix if present."""
url = TURSO_URL
if url.startswith("libsql://"):
url = url[len("libsql://") :]
return url
turso_url: str = ""
turso_token: str = ""
voyage_api_key: str = ""
slack_api_token: str = ""

@field_validator("turso_url", "turso_token", "voyage_api_key", "slack_api_token", mode="before")
@classmethod
def strip_quotes(cls, v: str) -> str:
if isinstance(v, str):
return v.strip().strip('"').strip("'")
return v
Comment on lines +21 to +24
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field validator checks if v is a string but doesn't handle the case where v might be None. If a user doesn't set an environment variable and Pydantic passes None to the validator, it will return None which may cause issues later when code expects an empty string. Consider handling None explicitly: if v is None: return "" or if not isinstance(v, str): return "" to ensure consistent behavior.

Suggested change
def strip_quotes(cls, v: str) -> str:
if isinstance(v, str):
return v.strip().strip('"').strip("'")
return v
def strip_quotes(cls, v: Any) -> str:
if v is None:
return ""
if isinstance(v, str):
return v.strip().strip('"').strip("'")
return str(v)

Copilot uses AI. Check for mistakes.

@computed_field
@property
def turso_host(self) -> str:
"""Strip libsql:// prefix if present."""
url = self.turso_url
if url.startswith("libsql://"):
url = url[len("libsql://") :]
return url


@lru_cache
def get_settings() -> Settings:
return Settings()


async def turso_query(sql: str, args: list | None = None) -> list[dict[str, Any]]:
"""Execute a query against Turso and return rows."""
if not TURSO_URL or not TURSO_TOKEN:
settings = get_settings()
if not settings.turso_url or not settings.turso_token:
raise RuntimeError("TURSO_URL and TURSO_TOKEN must be set")

stmt: dict[str, Any] = {"sql": sql}
if args:
stmt["args"] = [{"type": "text", "value": str(a)} for a in args]

async with httpx.AsyncClient() as client:
response = await client.post(
f"https://{_get_turso_host()}/v2/pipeline",
headers={
"Authorization": f"Bearer {TURSO_TOKEN}",
"Content-Type": "application/json",
},
json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]},
timeout=30,
)
response.raise_for_status()
data = response.json()
payload = {"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}
url = f"https://{settings.turso_host}/v2/pipeline"

response = httpx.post(
url,
headers={
"Authorization": f"Bearer {settings.turso_token}",
"Content-Type": "application/json",
},
json=payload,
timeout=30,
)
Comment on lines +54 to +62
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The turso_query function is declared as async but uses synchronous httpx.post instead of an async client. This should either use httpx.AsyncClient or the function should not be async. The current implementation will block the event loop.

Suggested change
response = httpx.post(
url,
headers={
"Authorization": f"Bearer {settings.turso_token}",
"Content-Type": "application/json",
},
json=payload,
timeout=30,
)
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers={
"Authorization": f"Bearer {settings.turso_token}",
"Content-Type": "application/json",
},
json=payload,
timeout=30,
)

Copilot uses AI. Check for mistakes.
if response.status_code >= 400:
raise RuntimeError(f"Turso HTTP {response.status_code} for {url}: {response.text}")
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message includes the full response text which could potentially expose sensitive information like authentication tokens or internal server details. Consider sanitizing the error message or logging it separately while showing a generic error to users.

Suggested change
raise RuntimeError(f"Turso HTTP {response.status_code} for {url}: {response.text}")
raise RuntimeError(f"Turso HTTP {response.status_code} for {url}")

Copilot uses AI. Check for mistakes.
data = response.json()

result = data["results"][0]
if result["type"] == "error":
Expand All @@ -59,14 +83,15 @@ def extract_value(cell: Any) -> Any:

async def voyage_embed(text: str) -> list[float]:
"""Generate embedding for a query using Voyage AI."""
if not VOYAGE_API_KEY:
settings = get_settings()
if not settings.voyage_api_key:
raise RuntimeError("VOYAGE_API_KEY must be set for semantic search")

async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.voyageai.com/v1/embeddings",
headers={
"Authorization": f"Bearer {VOYAGE_API_KEY}",
"Authorization": f"Bearer {settings.voyage_api_key}",
"Content-Type": "application/json",
},
json={
Expand All @@ -80,3 +105,25 @@ async def voyage_embed(text: str) -> list[float]:
data = response.json()

return data["data"][0]["embedding"]


async def slack_get_thread(channel: str, thread_ts: str) -> list[dict[str, Any]]:
"""Fetch all messages from a Slack thread."""
settings = get_settings()
if not settings.slack_api_token:
raise RuntimeError("SLACK_API_TOKEN must be set to fetch thread content")

async with httpx.AsyncClient() as client:
response = await client.get(
"https://slack.com/api/conversations.replies",
headers={"Authorization": f"Bearer {settings.slack_api_token}"},
params={"channel": channel, "ts": thread_ts},
timeout=30,
)
response.raise_for_status()
data = response.json()

if not data.get("ok"):
raise RuntimeError(f"Slack API error: {data.get('error', 'unknown')}")

return data.get("messages", [])
Loading