From 246d0b2a378521d84535bcb387d15b49c0de616a Mon Sep 17 00:00:00 2001 From: hfellerhoff Date: Tue, 25 Nov 2025 02:00:05 -0500 Subject: [PATCH] completed per-feed result streaming --- src/lib/data/store.ts | 34 +++-- src/lib/sortFeedItems.ts | 15 +++ src/server/api/routers/feed-router/index.ts | 8 +- src/server/api/routers/feedItemRouter.ts | 72 +--------- src/server/db/utils.ts | 23 ++++ src/server/orpc/base.ts | 4 +- src/server/rss/fetchFeeds.ts | 138 +++++++++++++++++--- 7 files changed, 193 insertions(+), 101 deletions(-) create mode 100644 src/lib/sortFeedItems.ts create mode 100644 src/server/db/utils.ts diff --git a/src/lib/data/store.ts b/src/lib/data/store.ts index 52ab51b..9c2766a 100644 --- a/src/lib/data/store.ts +++ b/src/lib/data/store.ts @@ -4,7 +4,7 @@ import { ApplicationFeedItem } from "~/server/db/schema"; import { orpcRouterClient } from "../orpc"; import { createSelectorHooks } from "./createSelectorHooks"; -type ApplicationStore = { +export type ApplicationStore = { reset: () => void; feedItemsOrder: string[]; setFeedItemsOrder: (itemsOrder: string[]) => void; @@ -48,18 +48,23 @@ const vanillaApplicationStore = createStore()( }); let lastUpdateTime = 0; + const DEBOUNCE_TIME = 500; + for await (const incomingFeedItems of await orpcRouterClient.feedItem.getAll()) { - // const DEBOUNCE_TIME = 50; - // const timeSinceLastUpdate = Date.now() - lastUpdateTime; - // const timeToWait = DEBOUNCE_TIME - timeSinceLastUpdate; + const timeSinceLastUpdate = Date.now() - lastUpdateTime; + const timeToWait = DEBOUNCE_TIME - timeSinceLastUpdate; + const shouldWaitToRender = timeToWait > 0; - // if (timeToWait > 0) { - // await new Promise((res) => setTimeout(res, timeToWait)); - // } + const initialItemsDict = shouldWaitToRender + ? get().feedItemsDict + : { + ...get().feedItemsDict, + }; - // console.log("updating"); + const initialItemsOrder = shouldWaitToRender + ? get().feedItemsOrder + : [...get().feedItemsOrder]; - // TODO: create date sorting const { updatedItemsDict, updatedItemsOrder } = incomingFeedItems.reduce( ({ updatedItemsDict, updatedItemsOrder }, item) => { @@ -75,8 +80,8 @@ const vanillaApplicationStore = createStore()( }; }, { - updatedItemsDict: { ...get().feedItemsDict }, - updatedItemsOrder: [...get().feedItemsOrder], + updatedItemsDict: initialItemsDict, + updatedItemsOrder: initialItemsOrder, }, ); @@ -84,11 +89,16 @@ const vanillaApplicationStore = createStore()( feedItemsDict: updatedItemsDict, feedItemsOrder: updatedItemsOrder, }); - lastUpdateTime = Date.now(); + + if (!shouldWaitToRender) { + lastUpdateTime = Date.now(); + } } set({ fetchFeedItemsStatus: "success", fetchFeedItemsLastFetchedAt: Date.now(), + feedItemsDict: { ...get().feedItemsDict }, + feedItemsOrder: [...get().feedItemsOrder], }); }, }), diff --git a/src/lib/sortFeedItems.ts b/src/lib/sortFeedItems.ts new file mode 100644 index 0000000..e9130df --- /dev/null +++ b/src/lib/sortFeedItems.ts @@ -0,0 +1,15 @@ +import { ApplicationStore } from "./data/store"; + +export function sortFeedItemsOrderByDate( + feedItems: ApplicationStore["feedItemsDict"], +) { + return function (a: string, b: string) { + const itemA = feedItems?.[a]; + const itemB = feedItems?.[b]; + + if (!itemA || !itemB) return 0; + + if (itemA.postedAt < itemB.postedAt) return 1; + return -1; + }; +} diff --git a/src/server/api/routers/feed-router/index.ts b/src/server/api/routers/feed-router/index.ts index cd49a0e..b37940c 100644 --- a/src/server/api/routers/feed-router/index.ts +++ b/src/server/api/routers/feed-router/index.ts @@ -1,8 +1,8 @@ -import { and, desc, eq, inArray, notInArray, sql } from "drizzle-orm"; +import { and, eq, inArray, notInArray, sql } from "drizzle-orm"; import { z } from "zod"; import { parseArrayOfSchema } from "~/lib/schemas/utils"; -import { protectedProcedure } from "~/server/orpc/base"; +import { prepareArrayChunks } from "~/lib/iterators"; import { contentCategories, feedCategories, @@ -11,9 +11,9 @@ import { feedsSchema, openLocationSchema, } from "~/server/db/schema"; -import { fetchFeedData, fetchNewFeedDetails } from "~/server/rss/fetchFeeds"; +import { protectedProcedure } from "~/server/orpc/base"; +import { fetchNewFeedDetails } from "~/server/rss/fetchFeeds"; import { findExistingFeedThatMatches } from "./utils"; -import { prepareArrayChunks } from "~/lib/iterators"; type BulkImportFromFileSuccess = { feedUrl: string; diff --git a/src/server/api/routers/feedItemRouter.ts b/src/server/api/routers/feedItemRouter.ts index 6804d6c..cdad52f 100644 --- a/src/server/api/routers/feedItemRouter.ts +++ b/src/server/api/routers/feedItemRouter.ts @@ -3,10 +3,9 @@ import { and, desc, eq, gte, inArray } from "drizzle-orm"; import { z } from "zod"; import { prepareArrayChunks } from "~/lib/iterators"; -import { checkFeedItemIsVerticalFromUrl } from "~/server/checkFeedItemIsVertical"; import { type ApplicationFeedItem, feedItems, feeds } from "~/server/db/schema"; import { protectedProcedure } from "~/server/orpc/base"; -import { fetchFeedData } from "~/server/rss/fetchFeeds"; +import { fetchAndInsertFeedData } from "~/server/rss/fetchFeeds"; const isWithinLastMonth = gte( feedItems.postedAt, @@ -34,73 +33,16 @@ export const getAll = protectedProcedure.handler(async function* ({ context }) { } as ApplicationFeedItem; }); + // Send existing feed items to user for (const chunk of prepareArrayChunks(existingApplicationFeedItems, 50)) { yield chunk; } - // Get new items, yield - - // TODO: split this out such that we can return data from - // each feed as it comes in - const feedData = await fetchFeedData(feedsList); - if (!feedData) { - return; - } - - const feedItemList: (typeof feedItems.$inferInsert)[] = - feedData?.flatMap((feed) => { - return feed.items.map((item) => { - return { - feedId: feed.id, - contentId: item.id, - content: item.content ?? "", - title: item.title ?? "", - author: item.author ?? "", - thumbnail: item.thumbnail ?? "", - url: item.url ?? "", - postedAt: new Date(item.publishedDate), - orientation: checkFeedItemIsVerticalFromUrl(item.url), - } satisfies typeof feedItems.$inferInsert; - }); - }) ?? []; - - const feedItemsList = ( - await context.db.transaction(async (tx) => { - return await Promise.all( - feedItemList.map(async (item) => { - try { - return await tx - .insert(feedItems) - .values(item) - .onConflictDoUpdate({ - target: [feedItems.url, feedItems.feedId], - set: item, - }) - .returning(); - } catch { - // For local testing - // console.dir({ ...error }, { depth: null }); - } - - return null; - }), - ); - }) - ) - .filter(Boolean) - .flat(); - - const newApplicationFeedItems = feedItemsList.map((item) => { - const feed = feedsList.find((feed) => feed.id === item.feedId); - - return { - ...item, - platform: feed?.platform ?? "youtube", - } as ApplicationFeedItem; - }); - - for (const chunk of prepareArrayChunks(newApplicationFeedItems, 50)) { - yield chunk; + // Send new feed items to user as they come in + for await (const feedItems of fetchAndInsertFeedData(context, feedsList)) { + for (const chunk of prepareArrayChunks(feedItems, 50)) { + yield chunk; + } } return; diff --git a/src/server/db/utils.ts b/src/server/db/utils.ts new file mode 100644 index 0000000..73b951c --- /dev/null +++ b/src/server/db/utils.ts @@ -0,0 +1,23 @@ +import { SQL, getTableColumns, sql } from "drizzle-orm"; +import { PgTable } from "drizzle-orm/pg-core"; +import { SQLiteTable } from "drizzle-orm/sqlite-core"; + +export const buildConflictUpdateColumns = < + T extends PgTable | SQLiteTable, + Q extends keyof T["_"]["columns"], +>( + table: T, + columns: Q[], +) => { + const cls = getTableColumns(table); + + return columns.reduce( + (acc, column) => { + const colName = cls[column]!.name; + acc[column] = sql.raw(`excluded.${colName}`); + + return acc; + }, + {} as Record, + ); +}; diff --git a/src/server/orpc/base.ts b/src/server/orpc/base.ts index 4c663d2..b087b91 100644 --- a/src/server/orpc/base.ts +++ b/src/server/orpc/base.ts @@ -19,7 +19,9 @@ export async function createRPCContext(opts: { headers: Headers }) { }; } -const o = os.$context>>(); +export type ORPCContext = Awaited>; + +const o = os.$context(); const timingMiddleware = o.middleware(async ({ next, path }) => { const start = Date.now(); diff --git a/src/server/rss/fetchFeeds.ts b/src/server/rss/fetchFeeds.ts index bb52733..179d400 100644 --- a/src/server/rss/fetchFeeds.ts +++ b/src/server/rss/fetchFeeds.ts @@ -1,4 +1,7 @@ -import type { DatabaseFeed } from "../db/schema"; +import { checkFeedItemIsVerticalFromUrl } from "../checkFeedItemIsVertical"; +import { ApplicationFeedItem, DatabaseFeed, feedItems } from "../db/schema"; +import { buildConflictUpdateColumns } from "../db/utils"; +import { ORPCContext } from "../orpc/base"; import { fetchPeerTubeFeedData } from "./parsers/peertube"; import { fetchUnknownRssFeed } from "./parsers/unknown"; import { fetchWebsiteFeedData } from "./parsers/website"; @@ -42,23 +45,120 @@ export async function fetchNewFeedDetails( return feedDetailList; } -export async function fetchFeedData( +// : Promise + +type FeedResult = + | { + success: true; + feedItems: ApplicationFeedItem[]; + id: number; + } + | { + success: false; + id: number; + }; + +export async function* fetchAndInsertFeedData( + context: ORPCContext, databaseFeeds: DatabaseFeed[], -): Promise { - return ( - await Promise.all( - databaseFeeds.map(async (feed) => { - if (feed.platform === "youtube") { - return fetchYouTubeFeedData(feed); - } - if (feed.platform === "peertube") { - return fetchPeerTubeFeedData(feed); - } - if (feed.platform === "website") { - return fetchWebsiteFeedData(feed); - } - return null; - }), - ) - ).filter(Boolean); +) { + const feedIds = databaseFeeds.map((feed) => feed.id); + const feedPromises = databaseFeeds.map(async (feed): Promise => { + try { + let feedData: RSSFeed | null = null; + + if (feed.platform === "youtube") { + feedData = await fetchYouTubeFeedData(feed); + } + if (feed.platform === "peertube") { + feedData = await fetchPeerTubeFeedData(feed); + } + if (feed.platform === "website") { + feedData = await fetchWebsiteFeedData(feed); + } + + if (!feedData || !feedData.items.length) { + return { + success: false, + id: feed.id, + }; + } + + const feedItemList: (typeof feedItems.$inferInsert)[] = + feedData.items.map((item) => { + return { + feedId: feed.id, + contentId: item.id, + content: item.content ?? "", + title: item.title ?? "", + author: item.author ?? "", + thumbnail: item.thumbnail ?? "", + url: item.url ?? "", + postedAt: new Date(item.publishedDate), + orientation: checkFeedItemIsVerticalFromUrl(item.url), + } satisfies typeof feedItems.$inferInsert; + }); + + const feedItemsList = ( + await context.db + .insert(feedItems) + .values(feedItemList) + .onConflictDoUpdate({ + target: [feedItems.url, feedItems.feedId], + set: buildConflictUpdateColumns(feedItems, [ + "author", + "content", + "contentId", + "createdAt", + "orientation", + "postedAt", + "thumbnail", + "title", + "url", + ]), + }) + .returning() + ) + .filter(Boolean) + .flat(); + + const applicationFeedItems = feedItemsList.map((item) => { + const feed = databaseFeeds.find((feed) => feed.id === item.feedId); + + return { + ...item, + platform: feed?.platform ?? "youtube", + } as ApplicationFeedItem; + }); + + return { + success: true, + feedItems: applicationFeedItems, + id: feed.id, + }; + } catch (err) { + return { + success: false, + id: feed.id, + }; + } + }); + + while (feedPromises.length > 0) { + const result = await Promise.any(feedPromises); + + const resultIndex = feedIds.findIndex((id) => id === result.id); + feedPromises.splice(resultIndex, 1); + feedIds.splice(resultIndex, 1); + + if (!result.success) { + continue; + } + + if (result.success) { + yield result.feedItems; + } + } + + return; }