Skip to content

Commit 5c80364

Browse files
add member extract-source: timeline events (#389)
1 parent b3bd603 commit 5c80364

File tree

4 files changed

+67
-8
lines changed

4 files changed

+67
-8
lines changed

apps/stack/src/extract/extract-timeline-events.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import { createMessageHandler } from "@stack/config/create-message";
33
import { z } from "zod";
44

55
import { getTimelineEvents, type Context, type GetTimelineEventsEntities, type GetTimelineEventsSourceControl } from "@acme/extract-functions";
6-
import { mergeRequests, MergeRequestSchema, namespaces, NamespaceSchema, repositories, RepositorySchema, timelineEvents } from "@acme/extract-schema";
6+
import { members, mergeRequests, MergeRequestSchema, namespaces, NamespaceSchema, repositories, repositoriesToMembers, RepositorySchema, timelineEvents } from "@acme/extract-schema";
77
import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control";
88

9-
import { extractMergeRequestsEvent } from "./events";
9+
import { extractMembersEvent, extractMergeRequestsEvent } from "./events";
1010
import { getClerkUserToken } from "./get-clerk-user-token";
1111
import { MessageKind, metadataSchema } from "./messages";
1212
import { getTenantDb, type OmitDb } from "@stack/config/get-tenant-db";
1313
import { Config } from "sst/node/config";
14+
import { filterNewExtractMembers } from "./filter-extract-members";
1415

1516
export const timelineEventsSenderHandler = createMessageHandler({
1617
queueId: 'ExtractQueue',
@@ -29,14 +30,30 @@ export const timelineEventsSenderHandler = createMessageHandler({
2930

3031
context.integrations.sourceControl = await initSourceControl(message.metadata.userId, message.metadata.sourceControl);
3132

33+
const { userId, sourceControl } = message.metadata;
3234
const { mergeRequestId, namespaceId, repositoryId } = message.content;
3335

34-
await getTimelineEvents({
36+
const { members } = await getTimelineEvents({
3537
mergeRequestId,
3638
namespaceId,
3739
repositoryId,
3840
}, { ...context, db: getTenantDb(message.metadata.tenantId) }
3941
);
42+
43+
const memberIds = filterNewExtractMembers(members).map(member => member.id);
44+
if (memberIds.length === 0) return;
45+
46+
await extractMembersEvent.publish({ memberIds }, {
47+
crawlId: message.metadata.crawlId,
48+
version: 1,
49+
caller: 'extract-timeline-events',
50+
sourceControl,
51+
userId,
52+
timestamp: new Date().getTime(),
53+
from: message.metadata.from,
54+
to: message.metadata.to,
55+
tenantId: message.metadata.tenantId,
56+
});
4057
}
4158
});
4259

@@ -51,6 +68,8 @@ const context: OmitDb<Context<
5168
repositories,
5269
mergeRequests,
5370
timelineEvents,
71+
members,
72+
repositoriesToMembers
5473
},
5574
integrations: {
5675
sourceControl: null,

packages/functions/extract/src/get-timeline-events.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { createClient } from '@libsql/client';
44

55
import type { Context } from "./config";
66
import { type GetTimelineEventsEntities, type GetTimelineEventsSourceControl, getTimelineEvents } from "./get-timeline-events";
7-
import { namespaces, repositories, mergeRequests, timelineEvents } from "@acme/extract-schema";
7+
import { namespaces, repositories, mergeRequests, timelineEvents, members, repositoriesToMembers } from "@acme/extract-schema";
88
import type { Repository, Namespace, MergeRequest, NewRepository, NewNamespace, NewMergeRequest } from "@acme/extract-schema";
99
import fs from 'fs';
1010

@@ -70,7 +70,9 @@ beforeAll(async () => {
7070
timelineEvents,
7171
namespaces,
7272
repositories,
73-
mergeRequests
73+
mergeRequests,
74+
members,
75+
repositoriesToMembers
7476
},
7577
integrations: {
7678
sourceControl: { fetchTimelineEvents }

packages/functions/extract/src/get-timeline-events.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { TimelineEvents } from "@acme/extract-schema";
1+
import type { Member, NewMember, TimelineEvents } from "@acme/extract-schema";
22
import type { Entities, ExtractFunction } from "./config"
33
import type { SourceControl } from "@acme/source-control";
44
import { eq, sql } from "drizzle-orm";
@@ -11,10 +11,11 @@ export type GetTimelineEventsInputs = {
1111

1212
export type GetTimelineEventsOutput = {
1313
timelineEvents: TimelineEvents[];
14+
members: Member[];
1415
};
1516

1617
export type GetTimelineEventsSourceControl = Pick<SourceControl, "fetchTimelineEvents">;
17-
export type GetTimelineEventsEntities = Pick<Entities, "namespaces" | "repositories" | "mergeRequests" | "timelineEvents">;
18+
export type GetTimelineEventsEntities = Pick<Entities, "namespaces" | "repositories" | "mergeRequests" | "timelineEvents" | "members" | "repositoriesToMembers">;
1819

1920
export type GetTimelineEventsFunction = ExtractFunction<GetTimelineEventsInputs, GetTimelineEventsOutput, GetTimelineEventsSourceControl, GetTimelineEventsEntities>
2021

@@ -37,6 +38,42 @@ export const getTimelineEvents: GetTimelineEventsFunction = async (
3738

3839
const { timelineEvents } = await integrations.sourceControl.fetchTimelineEvents(repository, namespace, mergeRequest);
3940

41+
const nonCommitEvents = timelineEvents.filter(ev => ev.type !== "committed");
42+
43+
const uniqueTimelineActors = [...nonCommitEvents.reduce((externalIdToActor, event) =>
44+
event.actorId ? externalIdToActor.set(event.actorId, { // actorId is optional due to commit events
45+
externalId: event.actorId,
46+
username: event.actorName,
47+
forgeType: repository.forgeType,
48+
extractedSource: 'timeline',
49+
}) : externalIdToActor, new Map<number, NewMember>()).values()];
50+
51+
52+
const insertedUniqueTimelineActors = uniqueTimelineActors.length === 0 ? [] : await db.transaction(async (tx) => {
53+
return Promise.all(uniqueTimelineActors.map(actor =>
54+
tx.insert(entities.members).values(actor)
55+
.onConflictDoUpdate({
56+
target: [
57+
entities.members.externalId,
58+
entities.members.forgeType
59+
],
60+
set: {
61+
username: actor.username,
62+
_updatedAt: sql`(strftime('%s', 'now'))`,
63+
},
64+
})
65+
.returning()
66+
.get()
67+
));
68+
});
69+
70+
if (insertedUniqueTimelineActors.length > 0) {
71+
await db.insert(entities.repositoriesToMembers)
72+
.values(insertedUniqueTimelineActors.map(member => ({ memberId: member.id, repositoryId })))
73+
.onConflictDoNothing()
74+
.run();
75+
}
76+
4077
const insertedTimelineEvents = await db.transaction(async (tx) => {
4178
return Promise.all(timelineEvents.map(event =>
4279
tx.insert(entities.timelineEvents).values(event)
@@ -54,5 +91,6 @@ export const getTimelineEvents: GetTimelineEventsFunction = async (
5491

5592
return {
5693
timelineEvents: insertedTimelineEvents,
94+
members: insertedUniqueTimelineActors
5795
};
5896
}

packages/schemas/extract/src/members.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export const members = sqliteTable('members', {
1313
name: text('name'),
1414
username: text('username').notNull(),
1515
email: text('email'),
16-
extractedSource: Enum('extracted_source', { enum: ['repository', 'namespace', 'notes'] }),
16+
extractedSource: Enum('extracted_source', { enum: ['repository', 'namespace', 'notes', 'timeline'] }),
1717
_createdAt: integer('__created_at', { mode: 'timestamp' }).default(sql`(strftime('%s', 'now'))`),
1818
_updatedAt: integer('__updated_at', { mode: 'timestamp' }).default(sql`(strftime('%s', 'now'))`),
1919
_extractedAt: integer('__extracted_at', { mode: 'timestamp' }),

0 commit comments

Comments
 (0)