Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions src/lib/data/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ApplicationFeedItem } from "~/server/db/schema";
import { orpcRouterClient } from "../orpc";
import { createSelectorHooks } from "./createSelectorHooks";

type ApplicationStore = {
export type ApplicationStore = {
reset: () => void;
feedItemsOrder: string[];
setFeedItemsOrder: (itemsOrder: string[]) => void;
Expand Down Expand Up @@ -48,18 +48,23 @@ const vanillaApplicationStore = createStore<ApplicationStore>()(
});

let lastUpdateTime = 0;
const DEBOUNCE_TIME = 500;

for await (const incomingFeedItems of await orpcRouterClient.feedItem.getAll()) {
// const DEBOUNCE_TIME = 50;
// const timeSinceLastUpdate = Date.now() - lastUpdateTime;
// const timeToWait = DEBOUNCE_TIME - timeSinceLastUpdate;
const timeSinceLastUpdate = Date.now() - lastUpdateTime;
const timeToWait = DEBOUNCE_TIME - timeSinceLastUpdate;
const shouldWaitToRender = timeToWait > 0;

// if (timeToWait > 0) {
// await new Promise((res) => setTimeout(res, timeToWait));
// }
const initialItemsDict = shouldWaitToRender
? get().feedItemsDict
: {
...get().feedItemsDict,
};

// console.log("updating");
const initialItemsOrder = shouldWaitToRender
? get().feedItemsOrder
: [...get().feedItemsOrder];

// TODO: create date sorting
const { updatedItemsDict, updatedItemsOrder } =
incomingFeedItems.reduce(
({ updatedItemsDict, updatedItemsOrder }, item) => {
Expand All @@ -75,20 +80,25 @@ const vanillaApplicationStore = createStore<ApplicationStore>()(
};
},
{
updatedItemsDict: { ...get().feedItemsDict },
updatedItemsOrder: [...get().feedItemsOrder],
updatedItemsDict: initialItemsDict,
updatedItemsOrder: initialItemsOrder,
},
);

set({
feedItemsDict: updatedItemsDict,
feedItemsOrder: updatedItemsOrder,
});
lastUpdateTime = Date.now();

if (!shouldWaitToRender) {
lastUpdateTime = Date.now();
}
}
set({
fetchFeedItemsStatus: "success",
fetchFeedItemsLastFetchedAt: Date.now(),
feedItemsDict: { ...get().feedItemsDict },
feedItemsOrder: [...get().feedItemsOrder],
});
},
}),
Expand Down
15 changes: 15 additions & 0 deletions src/lib/sortFeedItems.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { ApplicationStore } from "./data/store";

export function sortFeedItemsOrderByDate(
feedItems: ApplicationStore["feedItemsDict"],
) {
return function (a: string, b: string) {
const itemA = feedItems?.[a];
const itemB = feedItems?.[b];

if (!itemA || !itemB) return 0;

if (itemA.postedAt < itemB.postedAt) return 1;
return -1;
};
}
8 changes: 4 additions & 4 deletions src/server/api/routers/feed-router/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { and, desc, eq, inArray, notInArray, sql } from "drizzle-orm";
import { and, eq, inArray, notInArray, sql } from "drizzle-orm";
import { z } from "zod";
import { parseArrayOfSchema } from "~/lib/schemas/utils";

import { protectedProcedure } from "~/server/orpc/base";
import { prepareArrayChunks } from "~/lib/iterators";
import {
contentCategories,
feedCategories,
Expand All @@ -11,9 +11,9 @@ import {
feedsSchema,
openLocationSchema,
} from "~/server/db/schema";
import { fetchFeedData, fetchNewFeedDetails } from "~/server/rss/fetchFeeds";
import { protectedProcedure } from "~/server/orpc/base";
import { fetchNewFeedDetails } from "~/server/rss/fetchFeeds";
import { findExistingFeedThatMatches } from "./utils";
import { prepareArrayChunks } from "~/lib/iterators";

type BulkImportFromFileSuccess = {
feedUrl: string;
Expand Down
72 changes: 7 additions & 65 deletions src/server/api/routers/feedItemRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import { and, desc, eq, gte, inArray } from "drizzle-orm";
import { z } from "zod";
import { prepareArrayChunks } from "~/lib/iterators";

import { checkFeedItemIsVerticalFromUrl } from "~/server/checkFeedItemIsVertical";
import { type ApplicationFeedItem, feedItems, feeds } from "~/server/db/schema";
import { protectedProcedure } from "~/server/orpc/base";
import { fetchFeedData } from "~/server/rss/fetchFeeds";
import { fetchAndInsertFeedData } from "~/server/rss/fetchFeeds";

const isWithinLastMonth = gte(
feedItems.postedAt,
Expand Down Expand Up @@ -34,73 +33,16 @@ export const getAll = protectedProcedure.handler(async function* ({ context }) {
} as ApplicationFeedItem;
});

// Send existing feed items to user
for (const chunk of prepareArrayChunks(existingApplicationFeedItems, 50)) {
yield chunk;
}

// Get new items, yield

// TODO: split this out such that we can return data from
// each feed as it comes in
const feedData = await fetchFeedData(feedsList);
if (!feedData) {
return;
}

const feedItemList: (typeof feedItems.$inferInsert)[] =
feedData?.flatMap((feed) => {
return feed.items.map((item) => {
return {
feedId: feed.id,
contentId: item.id,
content: item.content ?? "",
title: item.title ?? "",
author: item.author ?? "",
thumbnail: item.thumbnail ?? "",
url: item.url ?? "",
postedAt: new Date(item.publishedDate),
orientation: checkFeedItemIsVerticalFromUrl(item.url),
} satisfies typeof feedItems.$inferInsert;
});
}) ?? [];

const feedItemsList = (
await context.db.transaction(async (tx) => {
return await Promise.all(
feedItemList.map(async (item) => {
try {
return await tx
.insert(feedItems)
.values(item)
.onConflictDoUpdate({
target: [feedItems.url, feedItems.feedId],
set: item,
})
.returning();
} catch {
// For local testing
// console.dir({ ...error }, { depth: null });
}

return null;
}),
);
})
)
.filter(Boolean)
.flat();

const newApplicationFeedItems = feedItemsList.map((item) => {
const feed = feedsList.find((feed) => feed.id === item.feedId);

return {
...item,
platform: feed?.platform ?? "youtube",
} as ApplicationFeedItem;
});

for (const chunk of prepareArrayChunks(newApplicationFeedItems, 50)) {
yield chunk;
// Send new feed items to user as they come in
for await (const feedItems of fetchAndInsertFeedData(context, feedsList)) {
for (const chunk of prepareArrayChunks(feedItems, 50)) {
yield chunk;
}
}

return;
Expand Down
23 changes: 23 additions & 0 deletions src/server/db/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { SQL, getTableColumns, sql } from "drizzle-orm";
import { PgTable } from "drizzle-orm/pg-core";
import { SQLiteTable } from "drizzle-orm/sqlite-core";

export const buildConflictUpdateColumns = <
T extends PgTable | SQLiteTable,
Q extends keyof T["_"]["columns"],
>(
table: T,
columns: Q[],
) => {
const cls = getTableColumns(table);

return columns.reduce(
(acc, column) => {
const colName = cls[column]!.name;
acc[column] = sql.raw(`excluded.${colName}`);

return acc;
},
{} as Record<Q, SQL>,
);
};
4 changes: 3 additions & 1 deletion src/server/orpc/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export async function createRPCContext(opts: { headers: Headers }) {
};
}

const o = os.$context<Awaited<ReturnType<typeof createRPCContext>>>();
export type ORPCContext = Awaited<ReturnType<typeof createRPCContext>>;

const o = os.$context<ORPCContext>();

const timingMiddleware = o.middleware(async ({ next, path }) => {
const start = Date.now();
Expand Down
138 changes: 119 additions & 19 deletions src/server/rss/fetchFeeds.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { DatabaseFeed } from "../db/schema";
import { checkFeedItemIsVerticalFromUrl } from "../checkFeedItemIsVertical";
import { ApplicationFeedItem, DatabaseFeed, feedItems } from "../db/schema";
import { buildConflictUpdateColumns } from "../db/utils";
import { ORPCContext } from "../orpc/base";
import { fetchPeerTubeFeedData } from "./parsers/peertube";
import { fetchUnknownRssFeed } from "./parsers/unknown";
import { fetchWebsiteFeedData } from "./parsers/website";
Expand Down Expand Up @@ -42,23 +45,120 @@ export async function fetchNewFeedDetails(
return feedDetailList;
}

export async function fetchFeedData(
// : Promise<RSSFeed[] | null>

type FeedResult =
| {
success: true;
feedItems: ApplicationFeedItem[];
id: number;
}
| {
success: false;
id: number;
};

export async function* fetchAndInsertFeedData(
context: ORPCContext,
databaseFeeds: DatabaseFeed[],
): Promise<RSSFeed[] | null> {
return (
await Promise.all(
databaseFeeds.map(async (feed) => {
if (feed.platform === "youtube") {
return fetchYouTubeFeedData(feed);
}
if (feed.platform === "peertube") {
return fetchPeerTubeFeedData(feed);
}
if (feed.platform === "website") {
return fetchWebsiteFeedData(feed);
}
return null;
}),
)
).filter(Boolean);
) {
const feedIds = databaseFeeds.map((feed) => feed.id);
const feedPromises = databaseFeeds.map(async (feed): Promise<FeedResult> => {
try {
let feedData: RSSFeed | null = null;

if (feed.platform === "youtube") {
feedData = await fetchYouTubeFeedData(feed);
}
if (feed.platform === "peertube") {
feedData = await fetchPeerTubeFeedData(feed);
}
if (feed.platform === "website") {
feedData = await fetchWebsiteFeedData(feed);
}

if (!feedData || !feedData.items.length) {
return {
success: false,
id: feed.id,
};
}

const feedItemList: (typeof feedItems.$inferInsert)[] =
feedData.items.map((item) => {
return {
feedId: feed.id,
contentId: item.id,
content: item.content ?? "",
title: item.title ?? "",
author: item.author ?? "",
thumbnail: item.thumbnail ?? "",
url: item.url ?? "",
postedAt: new Date(item.publishedDate),
orientation: checkFeedItemIsVerticalFromUrl(item.url),
} satisfies typeof feedItems.$inferInsert;
});

const feedItemsList = (
await context.db
.insert(feedItems)
.values(feedItemList)
.onConflictDoUpdate({
target: [feedItems.url, feedItems.feedId],
set: buildConflictUpdateColumns(feedItems, [
"author",
"content",
"contentId",
"createdAt",
"orientation",
"postedAt",
"thumbnail",
"title",
"url",
]),
})
.returning()
)
.filter(Boolean)
.flat();

const applicationFeedItems = feedItemsList.map((item) => {
const feed = databaseFeeds.find((feed) => feed.id === item.feedId);

return {
...item,
platform: feed?.platform ?? "youtube",
} as ApplicationFeedItem;
});

return {
success: true,
feedItems: applicationFeedItems,
id: feed.id,
};
} catch (err) {
return {
success: false,
id: feed.id,
};
}
});

while (feedPromises.length > 0) {
const result = await Promise.any(feedPromises);

const resultIndex = feedIds.findIndex((id) => id === result.id);
feedPromises.splice(resultIndex, 1);
feedIds.splice(resultIndex, 1);

if (!result.success) {
continue;
}

if (result.success) {
yield result.feedItems;
}
}

return;
}