diff --git a/meteor/server/api/ingest/packageInfo.ts b/meteor/server/api/ingest/packageInfo.ts index f1480aa0a9..95685fe73c 100644 --- a/meteor/server/api/ingest/packageInfo.ts +++ b/meteor/server/api/ingest/packageInfo.ts @@ -25,28 +25,28 @@ export async function onUpdatedPackageInfo(packageId: ExpectedPackageId, _doc: P return } - if (pkg.package.listenToPackageInfoUpdates) { - for (const source of pkg.ingestSources) { - switch (source.fromPieceType) { - case ExpectedPackageDBType.PIECE: - case ExpectedPackageDBType.ADLIB_PIECE: - case ExpectedPackageDBType.ADLIB_ACTION: - case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: - case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: - case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: - onUpdatedPackageInfoForRundownDebounce(pkg) - break - case ExpectedPackageDBType.BUCKET_ADLIB: - case ExpectedPackageDBType.BUCKET_ADLIB_ACTION: - onUpdatedPackageInfoForBucketItemDebounce(pkg, source) - break - case ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: - onUpdatedPackageInfoForStudioBaselineDebounce(pkg) - break - default: - assertNever(source) - break - } + for (const source of pkg.ingestSources) { + if (!source.listenToPackageInfoUpdates) continue + + switch (source.fromPieceType) { + case ExpectedPackageDBType.PIECE: + case ExpectedPackageDBType.ADLIB_PIECE: + case ExpectedPackageDBType.ADLIB_ACTION: + case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: + case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: + case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: + onUpdatedPackageInfoForRundownDebounce(pkg) + break + case ExpectedPackageDBType.BUCKET_ADLIB: + case ExpectedPackageDBType.BUCKET_ADLIB_ACTION: + onUpdatedPackageInfoForBucketItemDebounce(pkg, source) + break + case ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: + onUpdatedPackageInfoForStudioBaselineDebounce(pkg) + break + default: + assertNever(source) + break } } } diff --git a/meteor/server/migration/X_X_X.ts b/meteor/server/migration/X_X_X.ts index 9197b1a79b..4c84818710 100644 --- a/meteor/server/migration/X_X_X.ts +++ b/meteor/server/migration/X_X_X.ts @@ -55,6 +55,8 @@ export const addSteps = addMigrationSteps(CURRENT_SYSTEM_VERSION, [ pieceId: pkg.pieceId, partId: pkg.partId, segmentId: pkg.segmentId, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.ADLIB_ACTION: @@ -64,6 +66,8 @@ export const addSteps = addMigrationSteps(CURRENT_SYSTEM_VERSION, [ pieceId: pkg.pieceId, partId: pkg.partId, segmentId: pkg.segmentId, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.BASELINE_ADLIB_PIECE: @@ -71,6 +75,8 @@ export const addSteps = addMigrationSteps(CURRENT_SYSTEM_VERSION, [ ingestSource = { fromPieceType: pkg.fromPieceType, pieceId: pkg.pieceId, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.BASELINE_ADLIB_ACTION: @@ -78,12 +84,16 @@ export const addSteps = addMigrationSteps(CURRENT_SYSTEM_VERSION, [ ingestSource = { fromPieceType: pkg.fromPieceType, pieceId: pkg.pieceId, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: rundownId = pkg.rundownId ingestSource = { fromPieceType: pkg.fromPieceType, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.BUCKET_ADLIB: @@ -92,6 +102,8 @@ export const addSteps = addMigrationSteps(CURRENT_SYSTEM_VERSION, [ fromPieceType: pkg.fromPieceType, pieceId: pkg.pieceId, pieceExternalId: pkg.pieceExternalId, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.BUCKET_ADLIB_ACTION: @@ -100,11 +112,15 @@ export const addSteps = addMigrationSteps(CURRENT_SYSTEM_VERSION, [ fromPieceType: pkg.fromPieceType, pieceId: pkg.pieceId, pieceExternalId: pkg.pieceExternalId, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break case PackagesPreR53.ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: ingestSource = { fromPieceType: pkg.fromPieceType, + blueprintPackageId: pkg.blueprintPackageId, + listenToPackageInfoUpdates: pkg.listenToPackageInfoUpdates, } break default: diff --git a/meteor/server/publications/pieceContentStatusUI/checkPieceContentStatus.ts b/meteor/server/publications/pieceContentStatusUI/checkPieceContentStatus.ts index a8f43da764..0a67fb2654 100644 --- a/meteor/server/publications/pieceContentStatusUI/checkPieceContentStatus.ts +++ b/meteor/server/publications/pieceContentStatusUI/checkPieceContentStatus.ts @@ -11,9 +11,8 @@ import { VTContent, } from '@sofie-automation/blueprints-integration' import { - ExpectedPackageDBType, getExpectedPackageIdForPieceInstance, - getExpectedPackageIdFromIngestSource, + getExpectedPackageIdNew, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' import { BucketId, @@ -666,21 +665,7 @@ async function checkPieceContentExpectedPackageStatus( checkedPackageContainers.add(matchedPackageContainer[0]) - const expectedPackageIds = [ - // Synthesize the expected packageId from the piece - getExpectedPackageIdFromIngestSource( - packageOwnerId, - { - fromPieceType: ExpectedPackageDBType.PIECE, - // HACK: This shouldn't be cast as any, because this could be a bucket piece, but that gives the same result - pieceId: piece._id as any, - // HACK: We need a value, but the method doesn't use them.. - partId: piece._id as any, - segmentId: piece._id as any, - }, - expectedPackage._id - ), - ] + const expectedPackageIds = [getExpectedPackageIdNew(packageOwnerId, expectedPackage)] if (piece.pieceInstanceId) { // If this is a PieceInstance, try looking up the PieceInstance first expectedPackageIds.unshift( diff --git a/packages/corelib/src/dataModel/ExpectedPackages.ts b/packages/corelib/src/dataModel/ExpectedPackages.ts index 440ba36f1e..4c3a50818f 100644 --- a/packages/corelib/src/dataModel/ExpectedPackages.ts +++ b/packages/corelib/src/dataModel/ExpectedPackages.ts @@ -1,6 +1,6 @@ import { ExpectedPackage, Time } from '@sofie-automation/blueprints-integration' -import { protectString, unprotectString } from '../protectedString.js' -import { getHash, assertNever } from '../lib.js' +import { protectString } from '../protectedString.js' +import { getHash, hashObj } from '../lib.js' import { AdLibActionId, BucketAdLibActionId, @@ -51,10 +51,12 @@ export interface ExpectedPackageDB { created: Time - package: ReadonlyDeep + package: ReadonlyDeep> - // HACK: This should be ExpectedPackageIngestSource[], but for the first iteration this is limited to a single source - ingestSources: [ExpectedPackageIngestSource] + /** + * The ingest sources that generated this package. + */ + ingestSources: ExpectedPackageIngestSource[] // playoutSources: { // /** Any playout PieceInstance. This is limited to the current and next partInstances */ @@ -62,14 +64,22 @@ export interface ExpectedPackageDB { // } } -export interface ExpectedPackageIngestSourceBucketPiece { +export interface ExpectedPackageIngestSourceBase { + /** The id of the package as known by the blueprints */ + blueprintPackageId: string + + /** Whether the blueprints are listening for updates to packageInfos for this package */ + listenToPackageInfoUpdates: boolean | undefined +} + +export interface ExpectedPackageIngestSourceBucketAdlibPiece extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB /** The Bucket adlib this package belongs to */ pieceId: BucketAdLibId /** The `externalId` of the Bucket adlib this package belongs to */ pieceExternalId: string } -export interface ExpectedPackageIngestSourceBucketAdlibAction { +export interface ExpectedPackageIngestSourceBucketAdlibAction extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB_ACTION /** The Bucket adlib-action this package belongs to */ pieceId: BucketAdLibActionId @@ -77,7 +87,7 @@ export interface ExpectedPackageIngestSourceBucketAdlibAction { pieceExternalId: string } -export interface ExpectedPackageIngestSourcePiece { +export interface ExpectedPackageIngestSourcePiece extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.PIECE | ExpectedPackageDBType.ADLIB_PIECE /** The Piece this package belongs to */ pieceId: PieceId @@ -86,7 +96,7 @@ export interface ExpectedPackageIngestSourcePiece { /** The Segment this package belongs to */ segmentId: SegmentId } -export interface ExpectedPackageIngestSourceAdlibAction { +export interface ExpectedPackageIngestSourceAdlibAction extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.ADLIB_ACTION /** The Piece this package belongs to */ pieceId: AdLibActionId @@ -95,21 +105,21 @@ export interface ExpectedPackageIngestSourceAdlibAction { /** The Segment this package belongs to */ segmentId: SegmentId } -export interface ExpectedPackageIngestSourceBaselineAdlibPiece { +export interface ExpectedPackageIngestSourceBaselineAdlibPiece extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.BASELINE_ADLIB_PIECE /** The Piece this package belongs to */ pieceId: PieceId } -export interface ExpectedPackageIngestSourceBaselineAdlibAction { +export interface ExpectedPackageIngestSourceBaselineAdlibAction extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.BASELINE_ADLIB_ACTION /** The Piece this package belongs to */ pieceId: RundownBaselineAdLibActionId } -export interface ExpectedPackageIngestSourceBaselineObjects { +export interface ExpectedPackageIngestSourceBaselineObjects extends ExpectedPackageIngestSourceBase { fromPieceType: ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS } -export interface ExpectedPackageIngestSourceStudioBaseline { +export interface ExpectedPackageIngestSourceStudioBaseline extends ExpectedPackageIngestSourceBase { // Future: Technically this is a playout source, but for now it needs to be treated as an ingest source fromPieceType: ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS } @@ -117,7 +127,7 @@ export interface ExpectedPackageIngestSourceStudioBaseline { export type ExpectedPackageIngestSourcePart = ExpectedPackageIngestSourcePiece | ExpectedPackageIngestSourceAdlibAction export type ExpectedPackageIngestSourceBucket = - | ExpectedPackageIngestSourceBucketPiece + | ExpectedPackageIngestSourceBucketAdlibPiece | ExpectedPackageIngestSourceBucketAdlibAction export type ExpectedPackageIngestSourceRundownBaseline = @@ -145,56 +155,21 @@ export function getExpectedPackageIdForPieceInstance( } /** - * Generate the temporary expectedPackageId for the given package. - * Note: This will soon be replaced with a new flow based on the contentVersionHash once shared ownership is implemented. + * Generate the expectedPackageId for the given expectedPackage. + * This is a stable id derived from the package and its parent. This document is expected to be owned by multiple sources. */ -export function getExpectedPackageIdFromIngestSource( +export function getExpectedPackageIdNew( /** Preferably a RundownId or BucketId, but StudioId is allowed when not owned by a rundown or bucket */ parentId: RundownId | StudioId | BucketId, - source: ExpectedPackageIngestSource, /** The locally unique id of the expectedPackage */ - localExpectedPackageId: ExpectedPackage.Base['_id'] + expectedPackage: ReadonlyDeep> ): ExpectedPackageId { - let ownerId: string - const ownerPieceType = source.fromPieceType - switch (source.fromPieceType) { - case ExpectedPackageDBType.PIECE: - case ExpectedPackageDBType.ADLIB_PIECE: - case ExpectedPackageDBType.ADLIB_ACTION: - case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: - case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: - ownerId = unprotectString(source.pieceId) - break - case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: - ownerId = 'rundownBaselineObjects' - break - case ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: - ownerId = 'studioBaseline' - break - case ExpectedPackageDBType.BUCKET_ADLIB: - case ExpectedPackageDBType.BUCKET_ADLIB_ACTION: - ownerId = unprotectString(source.pieceId) - break - - default: - assertNever(source) - throw new Error(`Unknown fromPieceType "${ownerPieceType}"`) - } - return protectString(`${parentId}_${ownerId}_${getHash(localExpectedPackageId)}`) + // This may be too agressive, but we don't know how to merge some of the properties + const objHash = hashObj({ + ...expectedPackage, + _id: '', // Ignore the _id, this is not guaranteed to be stable + listenToPackageInfoUpdates: false, // Not relevant for the hash + } satisfies ReadonlyDeep) + + return protectString(`${parentId}_${getHash(objHash)}`) } - -// Future implementation of id generation, once shared ownership is implemented -// export function getExpectedPackageIdNew( -// /** _id of the rundown*/ -// rundownId: RundownId, -// /** The locally unique id of the expectedPackage */ -// expectedPackage: ReadonlyDeep -// ): ExpectedPackageId { -// // This may be too agressive, but we don't know how to merge some of the properties -// const objHash = hashObj({ -// ...expectedPackage, -// listenToPackageInfoUpdates: false, // Not relevant for the hash -// } satisfies ReadonlyDeep) - -// return protectString(`${rundownId}_${getHash(objHash)}`) -// } diff --git a/packages/job-worker/src/blueprints/context/watchedPackages.ts b/packages/job-worker/src/blueprints/context/watchedPackages.ts index edf97c9377..7c690f45c0 100644 --- a/packages/job-worker/src/blueprints/context/watchedPackages.ts +++ b/packages/job-worker/src/blueprints/context/watchedPackages.ts @@ -15,7 +15,7 @@ import type { ExpectedPackageIngestSource } from '@sofie-automation/corelib/dist export class WatchedPackagesHelper { private readonly packages = new Map< ExpectedPackageId, - ReadonlyDeep> + ReadonlyDeep>[] >() private constructor( @@ -23,7 +23,9 @@ export class WatchedPackagesHelper { private readonly packageInfos: ReadonlyDeep ) { for (const pkg of packages) { - this.packages.set(pkg._id, pkg) + const arr = this.packages.get(pkg.packageId) || [] + arr.push(pkg) + this.packages.set(pkg.packageId, arr) } } @@ -65,7 +67,7 @@ export class WatchedPackagesHelper { return expectedPackage.ingestSources.map( (source) => ({ - _id: expectedPackage._id, + packageId: expectedPackage._id, package: expectedPackage.package, source: source, }) satisfies IngestExpectedPackage @@ -97,7 +99,7 @@ export class WatchedPackagesHelper { return this.#createFromPackages( context, - packages.filter((pkg) => !!pkg.package.listenToPackageInfoUpdates) + packages.filter((pkg) => !!pkg.source.listenToPackageInfoUpdates) ) } @@ -125,7 +127,7 @@ export class WatchedPackagesHelper { return this.#createFromPackages( context, - packages.filter((pkg) => !!pkg.package.listenToPackageInfoUpdates) + packages.filter((pkg) => !!pkg.source.listenToPackageInfoUpdates) ) } @@ -135,7 +137,7 @@ export class WatchedPackagesHelper { packages.length > 0 ? await context.directCollections.PackageInfos.findFetch({ studioId: context.studio._id, - packageId: { $in: packages.map((p) => p._id) }, + packageId: { $in: packages.map((p) => p.packageId) }, }) : [] @@ -144,7 +146,8 @@ export class WatchedPackagesHelper { /** * Create a new helper with a subset of the data in the current helper. - * This is useful so that all the data for a rundown can be loaded at the start of an ingest operation, and then subsets can be taken for particular blueprint methods without needing to do more db operations. + * This is useful so that all the data for a rundown can be loaded at the start of an ingest operation, + * and then subsets can be taken for particular blueprint methods without needing to do more db operations. * @param func A filter to check if each package should be included */ filter( @@ -152,11 +155,13 @@ export class WatchedPackagesHelper { func: (pkg: ReadonlyDeep>) => boolean ): WatchedPackagesHelper { const watchedPackages: ReadonlyDeep>[] = [] - for (const pkg of this.packages.values()) { - if (func(pkg)) watchedPackages.push(pkg) + for (const packages of this.packages.values()) { + for (const pkg of packages) { + if (func(pkg)) watchedPackages.push(pkg) + } } - const newPackageIds = new Set(watchedPackages.map((p) => p._id)) + const newPackageIds = new Set(watchedPackages.map((p) => p.packageId)) const watchedPackageInfos = this.packageInfos.filter((info) => newPackageIds.has(info.packageId)) return new WatchedPackagesHelper(watchedPackages, watchedPackageInfos) @@ -166,11 +171,16 @@ export class WatchedPackagesHelper { return this.packages.has(packageId) } - getPackageInfo(packageId: string): Readonly> { - for (const pkg of this.packages.values()) { - if (pkg.package._id === packageId) { - const info = this.packageInfos.filter((p) => p.packageId === pkg._id) - return unprotectObjectArray(info) + getPackageInfo(blueprintPackageId: string): Readonly> { + // Perhaps this should do some scoped source checks, but this should not be necessary. + // The caller should be ensuring that this helper has been filtered to only contain relevant packages + for (const packages of this.packages.values()) { + for (const pkg of packages) { + // Note: This finds the first package with the same blueprintPackageId. There could be multiple if the blueprints don't respect the uniqueness rules. + if (pkg.source.blueprintPackageId === blueprintPackageId) { + const info = this.packageInfos.filter((p) => p.packageId === pkg.packageId) + return unprotectObjectArray(info) + } } } diff --git a/packages/job-worker/src/blueprints/postProcess.ts b/packages/job-worker/src/blueprints/postProcess.ts index 5ec47e2f4f..563f0d550d 100644 --- a/packages/job-worker/src/blueprints/postProcess.ts +++ b/packages/job-worker/src/blueprints/postProcess.ts @@ -41,7 +41,7 @@ import { interpollateTranslation, wrapTranslatableMessageFromBlueprints, } from '@sofie-automation/corelib/dist/TranslatableMessage' -import { setDefaultIdOnExpectedPackages } from '../ingest/expectedPackages.js' +import { sanitiseExpectedPackages } from '../ingest/expectedPackages.js' import { logger } from '../logging.js' import { validateTimeline } from 'superfly-timeline' import { ReadonlyDeep } from 'type-fest' @@ -137,7 +137,7 @@ export function postProcessPieces( piece.timelineObjectsString = serializePieceTimelineObjectsBlob(timelineObjects) // Fill in ids of unnamed expectedPackages - setDefaultIdOnExpectedPackages(piece.expectedPackages) + sanitiseExpectedPackages(piece.expectedPackages) return piece }) @@ -267,7 +267,7 @@ export function postProcessAdLibPieces( piece.timelineObjectsString = serializePieceTimelineObjectsBlob(timelineObjects) // Fill in ids of unnamed expectedPackages - setDefaultIdOnExpectedPackages(piece.expectedPackages) + sanitiseExpectedPackages(piece.expectedPackages) return piece }) @@ -304,7 +304,7 @@ export function postProcessGlobalAdLibActions( ) // Fill in ids of unnamed expectedPackages - setDefaultIdOnExpectedPackages(action.expectedPackages) + sanitiseExpectedPackages(action.expectedPackages) return literal({ ...action, @@ -345,7 +345,7 @@ export function postProcessAdLibActions( ) // Fill in ids of unnamed expectedPackages - setDefaultIdOnExpectedPackages(action.expectedPackages) + sanitiseExpectedPackages(action.expectedPackages) return literal({ ...action, @@ -425,7 +425,7 @@ export function postProcessBucketAdLib( timelineObjectsString: EmptyPieceTimelineObjectsBlob, } // Fill in ids of unnamed expectedPackages - setDefaultIdOnExpectedPackages(piece.expectedPackages) + sanitiseExpectedPackages(piece.expectedPackages) const timelineObjects = postProcessTimelineObjects(piece._id, blueprintId, itemOrig.content.timelineObjects) piece.timelineObjectsString = serializePieceTimelineObjectsBlob(timelineObjects) @@ -474,7 +474,7 @@ export function postProcessBucketAction( } // Fill in ids of unnamed expectedPackages - setDefaultIdOnExpectedPackages(action.expectedPackages) + sanitiseExpectedPackages(action.expectedPackages) return action } diff --git a/packages/job-worker/src/ingest/bucket/bucketAdlibs.ts b/packages/job-worker/src/ingest/bucket/bucketAdlibs.ts index dd19f14023..3f99d92918 100644 --- a/packages/job-worker/src/ingest/bucket/bucketAdlibs.ts +++ b/packages/job-worker/src/ingest/bucket/bucketAdlibs.ts @@ -10,7 +10,6 @@ import { } from '@sofie-automation/corelib/dist/worker/ingest' import { cleanUpExpectedPackagesForBucketAdLibs, - cleanUpExpectedPackagesForBucketAdLibsActions, updateExpectedPackagesForBucketAdLibPiece, updateExpectedPackagesForBucketAdLibAction, } from '../expectedPackages.js' @@ -59,7 +58,7 @@ export async function handleBucketRemoveAdlibAction( await Promise.all([ context.directCollections.BucketAdLibActions.remove({ _id: { $in: idsToUpdate } }), cleanUpExpectedMediaItemForBucketAdLibActions(context, idsToUpdate), - cleanUpExpectedPackagesForBucketAdLibsActions(context, action.bucketId, idsToUpdate), + cleanUpExpectedPackagesForBucketAdLibs(context, action.bucketId, idsToUpdate), ]) } diff --git a/packages/job-worker/src/ingest/bucket/import.ts b/packages/job-worker/src/ingest/bucket/import.ts index 5698d5975f..e5ef14407c 100644 --- a/packages/job-worker/src/ingest/bucket/import.ts +++ b/packages/job-worker/src/ingest/bucket/import.ts @@ -12,7 +12,6 @@ import { getSystemVersion } from '../../lib/index.js' import { BucketItemImportProps, BucketItemRegenerateProps } from '@sofie-automation/corelib/dist/worker/ingest' import { cleanUpExpectedPackagesForBucketAdLibs, - cleanUpExpectedPackagesForBucketAdLibsActions, updateExpectedPackagesForBucketAdLibPiece, updateExpectedPackagesForBucketAdLibAction, } from '../expectedPackages.js' @@ -253,7 +252,7 @@ async function regenerateBucketItemFromIngestInfo( ps.push( cleanUpExpectedMediaItemForBucketAdLibActions(context, actionIdsToRemoveArray), - cleanUpExpectedPackagesForBucketAdLibsActions(context, bucketId, actionIdsToRemoveArray), + cleanUpExpectedPackagesForBucketAdLibs(context, bucketId, actionIdsToRemoveArray), context.directCollections.BucketAdLibActions.remove({ _id: { $in: actionIdsToRemoveArray } }) ) } diff --git a/packages/job-worker/src/ingest/expectedPackages.ts b/packages/job-worker/src/ingest/expectedPackages.ts index e5f346cd7d..57a55fd592 100644 --- a/packages/job-worker/src/ingest/expectedPackages.ts +++ b/packages/job-worker/src/ingest/expectedPackages.ts @@ -3,11 +3,12 @@ import { BucketAdLib } from '@sofie-automation/corelib/dist/dataModel/BucketAdLi import { ExpectedPackageDBType, ExpectedPackageDB, - getExpectedPackageIdFromIngestSource, ExpectedPackageIngestSource, + getExpectedPackageIdNew, + ExpectedPackageIngestSourceBucketAdlibAction, + ExpectedPackageIngestSourceBucketAdlibPiece, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' import { BucketId, BucketAdLibId, BucketAdLibActionId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { saveIntoDb } from '../db/changes.js' import { PlayoutModel } from '../playout/model/PlayoutModel.js' import { StudioPlayoutModel } from '../studio/model/StudioPlayoutModel.js' import { ReadonlyDeep } from 'type-fest' @@ -24,7 +25,8 @@ import { import { JobContext, JobStudio } from '../jobs/index.js' import { IngestModel } from './model/IngestModel.js' import { IngestPartModel } from './model/IngestPartModel.js' -import { clone, hashObj } from '@sofie-automation/corelib/dist/lib' +import { hashObj } from '@sofie-automation/corelib/dist/lib' +import { AnyBulkWriteOperation } from 'mongodb' export function updateExpectedMediaAndPlayoutItemsForPartModel(context: JobContext, part: IngestPartModel): void { updateExpectedMediaItemsForPartModel(context, part) @@ -45,7 +47,7 @@ function generateExpectedPackagesForBucketAdlib(studio: ReadonlyDeep, if (adlib.expectedPackages) { packages.push( - ...generateBucketExpectedPackages( + ...generateBucketExpectedPackages( studio, adlib.bucketId, { @@ -65,7 +67,7 @@ function generateExpectedPackagesForBucketAdlibAction(studio: ReadonlyDeep( studio, action.bucketId, { @@ -80,140 +82,188 @@ function generateExpectedPackagesForBucketAdlibAction(studio: ReadonlyDeep( studio: ReadonlyDeep, bucketId: BucketId, - source: ExpectedPackageIngestSource, + source: Omit, expectedPackages: ReadonlyDeep ): ExpectedPackageDB[] { const bases: ExpectedPackageDB[] = [] for (let i = 0; i < expectedPackages.length; i++) { const expectedPackage = expectedPackages[i] - const id = expectedPackage._id || '__unnamed' + i + + const fullPackage: ReadonlyDeep = { + ...expectedPackage, + _id: expectedPackage._id || '__unnamed' + i, + } bases.push({ - _id: getExpectedPackageIdFromIngestSource(bucketId, source, id), - package: { - ...clone(expectedPackage), - _id: id, - }, + _id: getExpectedPackageIdNew(bucketId, fullPackage), + package: fullPackage, studioId: studio._id, rundownId: null, bucketId: bucketId, - created: Date.now(), // This will be preserved during the `saveIntoDb` - ingestSources: [source], + created: Date.now(), // This will be preserved during the save if needed + ingestSources: [ + { + ...(source as any), // Because this is a generic, this spread doesnt work + blueprintPackageId: expectedPackage._id, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, + }, + ], }) } return bases } -export async function updateExpectedPackagesForBucketAdLibPiece( +async function writeUpdatedExpectedPackages( context: JobContext, - adlib: BucketAdLib + bucketId: BucketId, + documentsToSave: ExpectedPackageDB[], + matchSource: Partial ): Promise { - const packages = generateExpectedPackagesForBucketAdlib(context.studio, adlib) - - await saveIntoDb( - context, - context.directCollections.ExpectedPackages, - { - studioId: context.studioId, - bucketId: adlib.bucketId, - // Note: This assumes that there is only one ingest source for each piece - ingestSources: { - $elemMatch: { - fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB, - pieceId: adlib._id, + const writeOps: AnyBulkWriteOperation[] = [] + + const documentIdsToSave = documentsToSave.map((doc) => doc._id) + + // Find which documents already exist in the database + // It would be nice to avoid this, but that would make the update operation incredibly complex + // There is no risk of race conditions, as bucket packages are only modified in the ingest job worker + const existingDocIds = new Set( + ( + await context.directCollections.ExpectedPackages.findFetch( + { + _id: { $in: documentIdsToSave }, + studioId: context.studioId, + bucketId: bucketId, }, - }, - }, - packages, - { - beforeDiff: (obj, oldObj) => { - return { - ...obj, - // Preserve old created timestamp - created: oldObj.created, + { + projection: { + _id: 1, + }, } - }, - } + ) + ).map((doc) => doc._id) ) + + for (const doc of documentsToSave) { + if (existingDocIds.has(doc._id)) { + // Document already exists, perform an update to merge the source into the existing document + writeOps.push({ + updateOne: { + filter: { + _id: doc._id, + ingestSources: { + // This is pretty messy, but we need to make sure that we don't add the same source twice + $not: { + $elemMatch: matchSource, + }, + }, + }, + update: { + $push: { + ingestSources: doc.ingestSources[0], + }, + }, + }, + }) + } else { + // Perform a simple insert + writeOps.push({ + insertOne: { + document: doc, + }, + }) + } + } + + // Remove any old references from this source + writeOps.push({ + updateMany: { + filter: { + studioId: context.studioId, + bucketId: bucketId, + _id: { $nin: documentIdsToSave }, + }, + update: { + $pull: { + ingestSources: matchSource, + }, + }, + }, + }) + + await context.directCollections.ExpectedPackages.bulkWrite(writeOps) + + // Check for any packages that no longer have any sources + await cleanUpUnusedPackagesInBucket(context, bucketId) +} + +export async function updateExpectedPackagesForBucketAdLibPiece( + context: JobContext, + adlib: BucketAdLib +): Promise { + const documentsToSave = generateExpectedPackagesForBucketAdlib(context.studio, adlib) + + await writeUpdatedExpectedPackages(context, adlib.bucketId, documentsToSave, { + fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB, + pieceId: adlib._id, + }) } export async function updateExpectedPackagesForBucketAdLibAction( context: JobContext, action: BucketAdLibAction ): Promise { - const packages = generateExpectedPackagesForBucketAdlibAction(context.studio, action) - - await saveIntoDb( - context, - context.directCollections.ExpectedPackages, - { - studioId: context.studioId, - bucketId: action.bucketId, - // Note: This assumes that there is only one ingest source for each piece - ingestSources: { - $elemMatch: { - fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB_ACTION, - pieceId: action._id, - }, - }, - }, - packages, - { - beforeDiff: (obj, oldObj) => { - return { - ...obj, - // Preserve old created timestamp - created: oldObj.created, - } - }, - } - ) + const documentsToSave = generateExpectedPackagesForBucketAdlibAction(context.studio, action) + + await writeUpdatedExpectedPackages(context, action.bucketId, documentsToSave, { + fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB_ACTION, + pieceId: action._id, + }) } export async function cleanUpExpectedPackagesForBucketAdLibs( context: JobContext, bucketId: BucketId, - adLibIds: BucketAdLibId[] + adLibIds: Array ): Promise { if (adLibIds.length > 0) { - await context.directCollections.ExpectedPackages.remove({ - studioId: context.studioId, - bucketId: bucketId, - // Note: This assumes that there is only one ingest source for each piece - ingestSources: { - $elemMatch: { - fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB, - pieceId: { $in: adLibIds }, - }, + // Remove the claim for the adlibs from any expected packages in the db + await context.directCollections.ExpectedPackages.update( + { + studioId: context.studioId, + bucketId: bucketId, + // Note: this could have the ingestSources match, but that feels excessive as the $pull performs the same check }, - }) - } -} -export async function cleanUpExpectedPackagesForBucketAdLibsActions( - context: JobContext, - bucketId: BucketId, - adLibIds: BucketAdLibActionId[] -): Promise { - if (adLibIds.length > 0) { - await context.directCollections.ExpectedPackages.remove({ - studioId: context.studioId, - bucketId: bucketId, - // Note: This assumes that there is only one ingest source for each piece - ingestSources: { - $elemMatch: { - fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB_ACTION, - pieceId: { $in: adLibIds }, + { + $pull: { + ingestSources: { + fromPieceType: { + $in: [ExpectedPackageDBType.BUCKET_ADLIB, ExpectedPackageDBType.BUCKET_ADLIB_ACTION], + }, + pieceId: { $in: adLibIds }, + } as any, // This cast isn't nice, but is needed for some reason }, - }, - }) + } + ) + + // Remove any expected packages that have now have no owners + await cleanUpUnusedPackagesInBucket(context, bucketId) } } +async function cleanUpUnusedPackagesInBucket(context: JobContext, bucketId: BucketId) { + await context.directCollections.ExpectedPackages.remove({ + studioId: context.studioId, + bucketId: bucketId, + ingestSources: { $size: 0 }, + // Future: these currently can't be referenced by playoutSources, but they could be in the future + }) +} + export function updateBaselineExpectedPackagesOnStudio( context: JobContext, playoutModel: StudioPlayoutModel | PlayoutModel, @@ -224,15 +274,10 @@ export function updateBaselineExpectedPackagesOnStudio( playoutModel.setExpectedPackagesForStudioBaseline(baseline.expectedPackages ?? []) } -export function setDefaultIdOnExpectedPackages(expectedPackages: ExpectedPackage.Any[] | undefined): void { +export function sanitiseExpectedPackages(expectedPackages: ExpectedPackage.Any[] | undefined): void { // Fill in ids of unnamed expectedPackage if (expectedPackages) { - for (let i = 0; i < expectedPackages.length; i++) { - const expectedPackage = expectedPackages[i] - if (!expectedPackage._id) { - expectedPackage._id = `__index${i}` - } - + for (const expectedPackage of expectedPackages) { expectedPackage.contentVersionHash = getContentVersionHash(expectedPackage) } } diff --git a/packages/job-worker/src/ingest/generationRundown.ts b/packages/job-worker/src/ingest/generationRundown.ts index 50279b9ad1..df506054af 100644 --- a/packages/job-worker/src/ingest/generationRundown.ts +++ b/packages/job-worker/src/ingest/generationRundown.ts @@ -1,7 +1,9 @@ import { ExpectedPackageDBType, + ExpectedPackageIngestSourceBaselineAdlibAction, + ExpectedPackageIngestSourceBaselineAdlibPiece, + ExpectedPackageIngestSourceBaselineObjects, ExpectedPackageIngestSourceRundownBaseline, - getExpectedPackageIdFromIngestSource, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' import { BlueprintId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { RundownNote } from '@sofie-automation/corelib/dist/dataModel/Notes' @@ -37,7 +39,7 @@ import { calculateSegmentsAndRemovalsFromIngestData } from './generationSegment. import { SofieIngestRundownWithSource } from '@sofie-automation/corelib/dist/dataModel/SofieIngestDataCache' import { AdLibPiece } from '@sofie-automation/corelib/dist/dataModel/AdLibPiece' import { RundownBaselineAdLibAction } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibAction' -import { IngestExpectedPackage } from './model/IngestExpectedPackage.js' +import { ExpectedPackageCollector, IngestExpectedPackage } from './model/IngestExpectedPackage.js' export enum GenerateRundownMode { Create = 'create', @@ -344,53 +346,30 @@ function generateExpectedPackagesForBaseline( adLibActions: RundownBaselineAdLibAction[], expectedPackages: ExpectedPackage.Any[] ): IngestExpectedPackage[] { - const packages: IngestExpectedPackage[] = [] + const collector = new ExpectedPackageCollector(rundownId) - const wrapPackage = ( - expectedPackage: ReadonlyDeep, - source: ExpectedPackageIngestSourceRundownBaseline - ): IngestExpectedPackage => { - return { - _id: getExpectedPackageIdFromIngestSource(rundownId, source, expectedPackage._id), + // This expects to generate multiple documents with the same packageId, these get deduplicated during saving. + // This should only concern itself with avoiding duplicates with the same source - package: expectedPackage, - - source: source, - } - } - - // Future: this will need to deduplicate packages with the same content - // For now, we just generate a package for each expectedPackage - - for (const expectedPackage of expectedPackages) { - packages.push( - wrapPackage(expectedPackage, { - fromPieceType: ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS, - }) - ) - } + collector.addPackagesWithSource(expectedPackages, { + fromPieceType: ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS, + }) // Populate the ingestSources for (const piece of adLibPieces) { - for (const expectedPackage of piece.expectedPackages || []) { - packages.push( - wrapPackage(expectedPackage, { - fromPieceType: ExpectedPackageDBType.BASELINE_ADLIB_PIECE, - pieceId: piece._id, - }) - ) - } + if (piece.expectedPackages) + collector.addPackagesWithSource(piece.expectedPackages, { + fromPieceType: ExpectedPackageDBType.BASELINE_ADLIB_PIECE, + pieceId: piece._id, + }) } for (const piece of adLibActions) { - for (const expectedPackage of piece.expectedPackages || []) { - packages.push( - wrapPackage(expectedPackage, { - fromPieceType: ExpectedPackageDBType.BASELINE_ADLIB_ACTION, - pieceId: piece._id, - }) - ) - } + if (piece.expectedPackages) + collector.addPackagesWithSource(piece.expectedPackages, { + fromPieceType: ExpectedPackageDBType.BASELINE_ADLIB_ACTION, + pieceId: piece._id, + }) } - return packages + return collector.finish() } diff --git a/packages/job-worker/src/ingest/generationSegment.ts b/packages/job-worker/src/ingest/generationSegment.ts index dc1c7743b8..e3583e8515 100644 --- a/packages/job-worker/src/ingest/generationSegment.ts +++ b/packages/job-worker/src/ingest/generationSegment.ts @@ -191,10 +191,10 @@ async function checkIfSegmentReferencesUnloadedPackageInfos( // check if there are any updates right away? for (const part of segmentModel.parts) { for (const expectedPackage of part.expectedPackages) { - if (expectedPackage.package.listenToPackageInfoUpdates) { - if (!segmentWatchedPackages.hasPackage(expectedPackage._id)) { + if (expectedPackage.source.listenToPackageInfoUpdates) { + if (!segmentWatchedPackages.hasPackage(expectedPackage.packageId)) { // The package didn't exist prior to the blueprint running - expectedPackageIdsToCheck.add(expectedPackage._id) + expectedPackageIdsToCheck.add(expectedPackage.packageId) } } } diff --git a/packages/job-worker/src/ingest/model/IngestExpectedPackage.ts b/packages/job-worker/src/ingest/model/IngestExpectedPackage.ts index 9450a7bd5d..a64a7902d0 100644 --- a/packages/job-worker/src/ingest/model/IngestExpectedPackage.ts +++ b/packages/job-worker/src/ingest/model/IngestExpectedPackage.ts @@ -1,10 +1,11 @@ import type { ExpectedPackage } from '@sofie-automation/blueprints-integration' -import type { - ExpectedPackageDBType, - ExpectedPackageIngestSourcePart, - ExpectedPackageIngestSourceRundownBaseline, +import { + getExpectedPackageIdNew, + type ExpectedPackageDBType, + type ExpectedPackageIngestSourcePart, + type ExpectedPackageIngestSourceRundownBaseline, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' -import type { ExpectedPackageId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import type { BucketId, ExpectedPackageId, RundownId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' import type { ReadonlyDeep } from 'type-fest' /** @@ -16,9 +17,50 @@ export interface IngestExpectedPackage< | ExpectedPackageIngestSourcePart | ExpectedPackageIngestSourceRundownBaseline, > { - _id: ExpectedPackageId + packageId: ExpectedPackageId - package: ReadonlyDeep + package: ReadonlyDeep> source: TPackageSource } + +export class ExpectedPackageCollector { + readonly #parentId: RundownId | StudioId | BucketId + readonly #packages: IngestExpectedPackage[] = [] + + constructor(parentId: RundownId | StudioId | BucketId) { + this.#parentId = parentId + } + + addPackagesWithSource = ( // never to force the caller to specify the type + expectedPackages: ReadonlyDeep[], + source: Omit + ): void => { + const insertedPackagesForSource = new Set() + for (const expectedPackage of expectedPackages) { + const id = getExpectedPackageIdNew(this.#parentId, expectedPackage) + + // Deduplicate with an id including the blueprintPackageId. + // This is to ensure the blueprints can reference the package with that id still + const uniqueId = `${id}-${expectedPackage._id}-${expectedPackage.listenToPackageInfoUpdates ?? false}` + + // Ensure only inserted once for this source + if (insertedPackagesForSource.has(uniqueId)) continue + insertedPackagesForSource.add(uniqueId) + + this.#packages.push({ + packageId: id, + package: expectedPackage, + source: { + ...(source as any), // Because this is a generic, this spread doesnt work + blueprintPackageId: expectedPackage._id, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, + }, + }) + } + } + + finish(): IngestExpectedPackage[] { + return this.#packages + } +} diff --git a/packages/job-worker/src/ingest/model/implementation/ExpectedPackagesStore.ts b/packages/job-worker/src/ingest/model/implementation/ExpectedPackagesStore.ts index f1b64dd5f3..f99dcb8320 100644 --- a/packages/job-worker/src/ingest/model/implementation/ExpectedPackagesStore.ts +++ b/packages/job-worker/src/ingest/model/implementation/ExpectedPackagesStore.ts @@ -2,19 +2,12 @@ import { ExpectedMediaItemRundown } from '@sofie-automation/corelib/dist/dataMod import { ExpectedPlayoutItemRundown } from '@sofie-automation/corelib/dist/dataModel/ExpectedPlayoutItem' import { ExpectedMediaItemId, - ExpectedPackageId, ExpectedPlayoutItemId, PartId, RundownId, } from '@sofie-automation/corelib/dist/dataModel/Ids' import { ReadonlyDeep } from 'type-fest' -import { - diffAndReturnLatestObjects, - DocumentChanges, - getDocumentChanges, - setValuesAndTrackChanges, - setValuesAndTrackChangesFunc, -} from './utils.js' +import { diffAndReturnLatestObjects, DocumentChanges, getDocumentChanges, setValuesAndTrackChanges } from './utils.js' import type { IngestExpectedPackage } from '../IngestExpectedPackage.js' import { ExpectedPackageDBType } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' @@ -25,7 +18,7 @@ export class ExpectedPackagesStore() #expectedPlayoutItemsWithChanges = new Set() - #expectedPackagesWithChanges = new Set() + #expectedPackagesHasChanges = false get expectedMediaItems(): ReadonlyDeep { return this.#expectedMediaItems @@ -42,7 +35,7 @@ export class ExpectedPackagesStore 0 || this.#expectedPlayoutItemsWithChanges.size > 0 || - this.#expectedPackagesWithChanges.size > 0 + this.#expectedPackagesHasChanges ) } @@ -52,14 +45,14 @@ export class ExpectedPackagesStore { return getDocumentChanges(this.#expectedPlayoutItemsWithChanges, this.#expectedPlayoutItems) } - get expectedPackagesChanges(): DocumentChanges> { - return getDocumentChanges(this.#expectedPackagesWithChanges, this.#expectedPackages) - } + // get expectedPackagesChanges(): DocumentChanges> { + // return getDocumentChanges(this.#expectedPackagesWithChanges, this.#expectedPackages) + // } clearChangedFlags(): void { this.#expectedMediaItemsWithChanges.clear() this.#expectedPlayoutItemsWithChanges.clear() - this.#expectedPackagesWithChanges.clear() + this.#expectedPackagesHasChanges = false } #rundownId: RundownId @@ -88,9 +81,7 @@ export class ExpectedPackagesStore - updatePackageSource(pkg.source) - ) + for (const expectedPackage of this.#expectedPackages) { + const mutatorChanged = updatePackageSource(expectedPackage.source) + + // The doc changed, track it as such + if (mutatorChanged) this.#expectedPackagesHasChanges = true + } } compareToPreviousData(oldStore: ExpectedPackagesStore): void { @@ -127,11 +121,7 @@ export class ExpectedPackagesStore[]): void { - this.#expectedPackages = diffAndReturnLatestObjects( - this.#expectedPackagesWithChanges, - this.#expectedPackages, - expectedPackages - ) + this.#expectedPackagesHasChanges = true + this.#expectedPackages = [...expectedPackages] } } diff --git a/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts b/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts index e8b4a896d2..9c07a51390 100644 --- a/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts +++ b/packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts @@ -341,14 +341,14 @@ export class IngestModelImpl implements IngestModel, IngestDatabasePersistedMode findExpectedPackageIngestSources(packageId: ExpectedPackageId): ReadonlyDeep[] { const sources: ReadonlyDeep[] = [] - const baselinePackage = this.#rundownBaselineExpectedPackagesStore.expectedPackages.find( - (pkg) => pkg._id === packageId - ) - if (baselinePackage) sources.push(baselinePackage.source) + for (const baselinePackage of this.#rundownBaselineExpectedPackagesStore.expectedPackages) { + if (baselinePackage.packageId === packageId) sources.push(baselinePackage.source) + } for (const part of this.getAllOrderedParts()) { - const partPackage = part.expectedPackages.find((pkg) => pkg._id === packageId) - if (partPackage) sources.push(partPackage.source) + for (const partPackage of part.expectedPackages) { + if (partPackage.packageId === packageId) sources.push(partPackage.source) + } } return sources @@ -702,77 +702,39 @@ function groupExpectedPackages(expectedPackages: ExpectedPackageDB[]) { const groupedExpectedPackagesByPart = new Map[]>() for (const expectedPackage of expectedPackages) { - // Future: this is a temporary flow for a single owner - const src = expectedPackage.ingestSources[0] - switch (src.fromPieceType) { - case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: - case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: - case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: - baselineExpectedPackages.push({ - _id: expectedPackage._id, - package: expectedPackage.package, - source: src, - }) - break - case ExpectedPackageDBType.PIECE: - case ExpectedPackageDBType.ADLIB_PIECE: - case ExpectedPackageDBType.ADLIB_ACTION: { - const partPackages = groupedExpectedPackagesByPart.get(src.partId) ?? [] - partPackages.push({ - _id: expectedPackage._id, - package: expectedPackage.package, - source: src, - }) - groupedExpectedPackagesByPart.set(src.partId, partPackages) - break + for (const source of expectedPackage.ingestSources) { + switch (source.fromPieceType) { + case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: + case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: + case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: + baselineExpectedPackages.push({ + packageId: expectedPackage._id, + package: expectedPackage.package, + source: source, + }) + break + case ExpectedPackageDBType.PIECE: + case ExpectedPackageDBType.ADLIB_PIECE: + case ExpectedPackageDBType.ADLIB_ACTION: { + const partPackages = groupedExpectedPackagesByPart.get(source.partId) ?? [] + partPackages.push({ + packageId: expectedPackage._id, + package: expectedPackage.package, + source: source, + }) + groupedExpectedPackagesByPart.set(source.partId, partPackages) + break + } + case ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: + case ExpectedPackageDBType.BUCKET_ADLIB: + case ExpectedPackageDBType.BUCKET_ADLIB_ACTION: + // Ignore + break + default: + assertNever(source) + break } - case ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: - case ExpectedPackageDBType.BUCKET_ADLIB: - case ExpectedPackageDBType.BUCKET_ADLIB_ACTION: - // Ignore - break - default: - assertNever(src) - break } - - // Future: once this supports multiple owners - // const baselineIngestSources: ExpectedPackageIngestSourceRundownBaseline[] = [] - // const rundownIngestSources: ExpectedPackageIngestSourcePart[] = [] - // for (const src of expectedPackage.ingestSources) { - // switch (src.fromPieceType) { - // case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: - // case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: - // case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: - // baselineIngestSources.push(src) - // break - // case ExpectedPackageDBType.PIECE: - // case ExpectedPackageDBType.ADLIB_PIECE: - // case ExpectedPackageDBType.ADLIB_ACTION: - // rundownIngestSources.push(src) - // break - // default: - // assertNever(src) - // break - // } - // } - - // if (baselineIngestSources.length > 0) { - // baselineExpectedPackages.push({ - // ...expectedPackage, - // ingestSources: baselineIngestSources, - // }) - // } - - // const sourcesByPartId = groupByToMapFunc(rundownIngestSources, (src) => src.partId) - // for (const [partId, sources] of sourcesByPartId.entries()) { - // const partPackages = groupedExpectedPackagesByPart.get(partId) ?? [] - // partPackages.push({ - // ...expectedPackage, - // ingestSources: sources, - // }) - // groupedExpectedPackagesByPart.set(partId, partPackages) - // } } return { diff --git a/packages/job-worker/src/ingest/model/implementation/IngestPartModelImpl.ts b/packages/job-worker/src/ingest/model/implementation/IngestPartModelImpl.ts index 3c6b5492b6..85ab7f325f 100644 --- a/packages/job-worker/src/ingest/model/implementation/IngestPartModelImpl.ts +++ b/packages/job-worker/src/ingest/model/implementation/IngestPartModelImpl.ts @@ -178,7 +178,7 @@ export class IngestPartModelImpl implements IngestPartModel { /** * This IngestPartModel replaces an existing one. - * Run some comparisons to ensure that + * Run some comparisons to ensure that the changed flags are set correctly * @param previousModel */ compareToPreviousModel(previousModel: IngestPartModelImpl): void { diff --git a/packages/job-worker/src/ingest/model/implementation/IngestSegmentModelImpl.ts b/packages/job-worker/src/ingest/model/implementation/IngestSegmentModelImpl.ts index 3cbb11e3b7..69e1832d5f 100644 --- a/packages/job-worker/src/ingest/model/implementation/IngestSegmentModelImpl.ts +++ b/packages/job-worker/src/ingest/model/implementation/IngestSegmentModelImpl.ts @@ -14,11 +14,11 @@ import { clone } from '@sofie-automation/corelib/dist/lib' import { getPartId } from '../../lib.js' import { ExpectedPackageDBType, - getExpectedPackageIdFromIngestSource, + ExpectedPackageIngestSourceAdlibAction, ExpectedPackageIngestSourcePart, + ExpectedPackageIngestSourcePiece, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' -import type { ExpectedPackage } from '@sofie-automation/blueprints-integration' -import type { IngestExpectedPackage } from '../IngestExpectedPackage.js' +import { ExpectedPackageCollector, type IngestExpectedPackage } from '../IngestExpectedPackage.js' /** * A light wrapper around the IngestPartModel, so that we can track the deletions while still accessing the contents @@ -268,61 +268,39 @@ function generateExpectedPackagesForPart( adLibPieces: AdLibPiece[], adLibActions: AdLibAction[] ): IngestExpectedPackage[] { - const packages: IngestExpectedPackage[] = [] + const collector = new ExpectedPackageCollector(rundownId) - const wrapPackage = ( - expectedPackage: ReadonlyDeep, - source: ExpectedPackageIngestSourcePart - ): IngestExpectedPackage => { - return { - _id: getExpectedPackageIdFromIngestSource(rundownId, source, expectedPackage._id), - - package: expectedPackage, - - source: source, - } - } - - // Future: this will need to deduplicate packages with the same content - // For now, we just generate a package for each expectedPackage + // This expects to generate multiple documents with the same packageId, these get deduplicated during saving. + // This should only concern itself with avoiding duplicates with the same source // Populate the ingestSources for (const piece of pieces) { - for (const expectedPackage of piece.expectedPackages || []) { - packages.push( - wrapPackage(expectedPackage, { - fromPieceType: ExpectedPackageDBType.PIECE, - pieceId: piece._id, - partId: partId, - segmentId: segmentId, - }) - ) - } + if (piece.expectedPackages) + collector.addPackagesWithSource(piece.expectedPackages, { + fromPieceType: ExpectedPackageDBType.PIECE, + pieceId: piece._id, + partId: partId, + segmentId: segmentId, + }) } for (const piece of adLibPieces) { - for (const expectedPackage of piece.expectedPackages || []) { - packages.push( - wrapPackage(expectedPackage, { - fromPieceType: ExpectedPackageDBType.ADLIB_PIECE, - pieceId: piece._id, - partId: partId, - segmentId: segmentId, - }) - ) - } + if (piece.expectedPackages) + collector.addPackagesWithSource(piece.expectedPackages, { + fromPieceType: ExpectedPackageDBType.ADLIB_PIECE, + pieceId: piece._id, + partId: partId, + segmentId: segmentId, + }) } for (const piece of adLibActions) { - for (const expectedPackage of piece.expectedPackages || []) { - packages.push( - wrapPackage(expectedPackage, { - fromPieceType: ExpectedPackageDBType.ADLIB_ACTION, - pieceId: piece._id, - partId: partId, - segmentId: segmentId, - }) - ) - } + if (piece.expectedPackages) + collector.addPackagesWithSource(piece.expectedPackages, { + fromPieceType: ExpectedPackageDBType.ADLIB_ACTION, + pieceId: piece._id, + partId: partId, + segmentId: segmentId, + }) } - return packages + return collector.finish() } diff --git a/packages/job-worker/src/ingest/model/implementation/SaveIngestModel.ts b/packages/job-worker/src/ingest/model/implementation/SaveIngestModel.ts index 501cb28b5b..386d7ea173 100644 --- a/packages/job-worker/src/ingest/model/implementation/SaveIngestModel.ts +++ b/packages/job-worker/src/ingest/model/implementation/SaveIngestModel.ts @@ -15,12 +15,12 @@ import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString' import { IngestExpectedPackage } from '../IngestExpectedPackage.js' import { AnyBulkWriteOperation } from 'mongodb' import { ExpectedPackageId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { Complete, normalizeArrayToMap } from '@sofie-automation/corelib/dist/lib' +import { normalizeArrayToMap } from '@sofie-automation/corelib/dist/lib' export class SaveIngestModelHelper { readonly #rundownId: RundownId - #expectedPackages = new DocumentChangeTracker>() + #expectedPackages: IngestExpectedPackage[] = [] #expectedPlayoutItems = new DocumentChangeTracker() #expectedMediaItems = new DocumentChangeTracker() @@ -38,7 +38,7 @@ export class SaveIngestModelHelper { store: ExpectedPackagesStore, deleteAll?: boolean ): void { - this.#expectedPackages.addChanges(store.expectedPackagesChanges, deleteAll ?? false) + if (!deleteAll) this.#expectedPackages.push(...store.expectedPackages) this.#expectedPlayoutItems.addChanges(store.expectedPlayoutItemsChanges, deleteAll ?? false) this.#expectedMediaItems.addChanges(store.expectedMediaItemsChanges, deleteAll ?? false) } @@ -68,7 +68,6 @@ export class SaveIngestModelHelper { commit(context: JobContext): Array> { // Log deleted ids: const deletedIds: { [key: string]: ProtectedString[] } = { - expectedPackages: this.#expectedPackages.getDeletedIds(), expectedPlayoutItems: this.#expectedPlayoutItems.getDeletedIds(), expectedMediaItems: this.#expectedMediaItems.getDeletedIds(), segments: this.#segments.getDeletedIds(), @@ -84,11 +83,7 @@ export class SaveIngestModelHelper { } return [ - writeExpectedPackagesChangesForRundown( - context, - this.#rundownId, - Array.from(this.#expectedPackages.getDocumentsToSave().values()) - ), + writeExpectedPackagesChangesForRundown(context, this.#rundownId, this.#expectedPackages), context.directCollections.ExpectedPlayoutItems.bulkWrite(this.#expectedPlayoutItems.generateWriteOps()), context.directCollections.ExpectedMediaItems.bulkWrite(this.#expectedMediaItems.generateWriteOps()), @@ -101,7 +96,7 @@ export class SaveIngestModelHelper { } } -export async function writeExpectedPackagesChangesForRundown( +async function writeExpectedPackagesChangesForRundown( context: JobContext, rundownId: RundownId | null, documentsToSave: IngestExpectedPackage[] @@ -121,59 +116,69 @@ export async function writeExpectedPackagesChangesForRundown( )) as Pick[] const existingDocsMap = normalizeArrayToMap(existingDocs, '_id') - // Generate any insert and update operations - const ops: AnyBulkWriteOperation[] = [] + const packagesToSave = new Map() for (const doc of documentsToSave) { - const newDbDoc: Complete> = { + const partialDoc = packagesToSave.get(doc.packageId) + + if (partialDoc) { + // Add the source to the existing document + partialDoc.ingestSources.push(doc.source) + + // Maybe this should check for duplicates, but the point where these documents are generated should be handling that. + } else { + // Add a new document // Future: omit 'playoutSources from this doc - studioId: context.studioId, - rundownId: rundownId, - bucketId: null, - created: Date.now(), - package: doc.package, - ingestSources: [doc.source], + packagesToSave.set(doc.packageId, { + _id: doc.packageId, + studioId: context.studioId, + rundownId: rundownId, + bucketId: null, + created: Date.now(), + package: doc.package, + ingestSources: [doc.source], + }) } + } + // Generate any insert and update operations + const ops: AnyBulkWriteOperation[] = [] + for (const doc of packagesToSave.values()) { const existingDoc = existingDocsMap.get(doc._id) - if (existingDoc) { + if (!existingDoc) { + // Insert this new document + ops.push({ + insertOne: { + document: doc, + }, + }) + } else { // Document already exists, perform an update to preserve other fields + // Future: would it be beneficial to perform some diffing to only update the field if it has changed? ops.push({ updateOne: { filter: { _id: doc._id }, update: { + // Update every field that we want to define $set: { - // Update every field that we want to define - ...newDbDoc, + ingestSources: doc.ingestSources, }, }, }, }) - } else { - // Insert this new document - ops.push({ - insertOne: { - document: { - ...newDbDoc, - _id: doc._id, - }, - }, - }) } } // Look over the existing documents, and see is no longer referenced - const documentsToSaveMap = normalizeArrayToMap(documentsToSave, '_id') const idsToDelete: ExpectedPackageId[] = [] for (const doc of existingDocs) { // Skip if this document is in the list of documents to save - if (documentsToSaveMap.has(doc._id)) continue + if (packagesToSave.has(doc._id)) continue // Future: check for playoutSources idsToDelete.push(doc._id) } - // const idsToDelete = changeTracker.getDeletedIds() if (idsToDelete.length > 0) { ops.push({ deleteMany: { diff --git a/packages/job-worker/src/ingest/model/implementation/utils.ts b/packages/job-worker/src/ingest/model/implementation/utils.ts index a3cfc4a3f9..cec36172f1 100644 --- a/packages/job-worker/src/ingest/model/implementation/utils.ts +++ b/packages/job-worker/src/ingest/model/implementation/utils.ts @@ -67,19 +67,6 @@ export function setValuesAndTrackChanges } } } -export function setValuesAndTrackChangesFunc }>( - changedIds: Set, - objects: readonly T[], - mutator: (obj: T) => boolean -): void { - for (const obj of objects) { - const mutatorChanged = mutator(obj) - - // The doc changed, track it as such - if (mutatorChanged) changedIds.add(obj._id) - } -} - export function addManyToSet(set: Set, iter: Iterable): void { for (const val of iter) { set.add(val) diff --git a/packages/job-worker/src/ingest/packageInfo.ts b/packages/job-worker/src/ingest/packageInfo.ts index 9eb9fc2f39..546889149c 100644 --- a/packages/job-worker/src/ingest/packageInfo.ts +++ b/packages/job-worker/src/ingest/packageInfo.ts @@ -33,6 +33,9 @@ export async function handleUpdatedPackageInfoForRundown( for (const packageId of data.packageIds) { const pkgIngestSources = ingestModel.findExpectedPackageIngestSources(packageId) for (const source of pkgIngestSources) { + // Only consider sources that are marked to listen to package info updates + if (!source.listenToPackageInfoUpdates) continue + switch (source.fromPieceType) { case ExpectedPackageDBType.PIECE: case ExpectedPackageDBType.ADLIB_PIECE: diff --git a/packages/job-worker/src/playout/snapshot.ts b/packages/job-worker/src/playout/snapshot.ts index f674d4bd59..a7c9f288be 100644 --- a/packages/job-worker/src/playout/snapshot.ts +++ b/packages/job-worker/src/playout/snapshot.ts @@ -2,7 +2,7 @@ import { ExpectedPackageDB, ExpectedPackageDBType, ExpectedPackageIngestSource, - getExpectedPackageIdFromIngestSource, + getExpectedPackageIdNew, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' import { AdLibActionId, @@ -30,7 +30,7 @@ import { CoreRundownPlaylistSnapshot } from '@sofie-automation/corelib/dist/snap import { unprotectString, ProtectedString, protectString } from '@sofie-automation/corelib/dist/protectedString' import { saveIntoDb } from '../db/changes.js' import { getPartId, getSegmentId } from '../ingest/lib.js' -import { assertNever, getRandomId, literal, omit } from '@sofie-automation/corelib/dist/lib' +import { assertNever, getHash, getRandomId, literal, omit } from '@sofie-automation/corelib/dist/lib' import { logger } from '../logging.js' import { JSONBlobParse, JSONBlobStringify } from '@sofie-automation/shared-lib/dist/lib/JSONBlob' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' @@ -371,6 +371,8 @@ export async function handleRestorePlaylistSnapshot( expectedPackage.segmentId, `expectedPackage.segmentId=${expectedPackage.segmentId}` ), + blueprintPackageId: expectedPackage.blueprintPackageId, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, } break @@ -382,6 +384,8 @@ export async function handleRestorePlaylistSnapshot( expectedPackage.pieceId, `expectedPackage.pieceId=${expectedPackage.pieceId}` ) as any, + blueprintPackageId: expectedPackage.blueprintPackageId, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, } break @@ -390,6 +394,8 @@ export async function handleRestorePlaylistSnapshot( case PackagesPreR53.ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: { source = { fromPieceType: expectedPackage.fromPieceType, + blueprintPackageId: expectedPackage.blueprintPackageId, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, } break } @@ -414,6 +420,8 @@ export async function handleRestorePlaylistSnapshot( pieceId: protectString('fakePiece'), partId: protectString('fakePart'), segmentId: protectString('fakeSegment'), + blueprintPackageId: expectedPackage.blueprintPackageId, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, } } @@ -424,10 +432,39 @@ export async function handleRestorePlaylistSnapshot( `expectedPackage.rundownId=${expectedPackage.rundownId}` ) : null - const newPackageId = getExpectedPackageIdFromIngestSource( - packageRundownId || context.studioId, - source, - expectedPackage.blueprintPackageId + + // Generate a unique id for the package. + // This is done differently to ensure we don't have id collisions that the documents arent expecting + // Note: maybe this should do the work to generate in the new deduplicated form, but that likely has no benefit + let packageOwnerId: string + const ownerPieceType = source.fromPieceType + switch (source.fromPieceType) { + case ExpectedPackageDBType.PIECE: + case ExpectedPackageDBType.ADLIB_PIECE: + case ExpectedPackageDBType.ADLIB_ACTION: + case ExpectedPackageDBType.BASELINE_ADLIB_PIECE: + case ExpectedPackageDBType.BASELINE_ADLIB_ACTION: + packageOwnerId = unprotectString(source.pieceId) + break + case ExpectedPackageDBType.RUNDOWN_BASELINE_OBJECTS: + packageOwnerId = 'rundownBaselineObjects' + break + case ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS: + packageOwnerId = 'studioBaseline' + break + case ExpectedPackageDBType.BUCKET_ADLIB: + case ExpectedPackageDBType.BUCKET_ADLIB_ACTION: + packageOwnerId = unprotectString(source.pieceId) + break + + default: + assertNever(source) + throw new Error(`Unknown fromPieceType "${ownerPieceType}"`) + } + const newPackageId = protectString( + `${packageRundownId || context.studioId}_${packageOwnerId}_${getHash( + expectedPackage.blueprintPackageId + )}` ) const newExpectedPackage: ExpectedPackageDB = { @@ -509,10 +546,9 @@ export async function handleRestorePlaylistSnapshot( } // Regenerate the ID from the new rundownId and packageId - expectedPackage._id = getExpectedPackageIdFromIngestSource( + expectedPackage._id = getExpectedPackageIdNew( expectedPackage.rundownId || expectedPackage.studioId, - expectedPackage.ingestSources[0], - expectedPackage.package._id + expectedPackage.package ) expectedPackageIdMap.set(oldId, expectedPackage._id) diff --git a/packages/job-worker/src/studio/model/StudioBaselineHelper.ts b/packages/job-worker/src/studio/model/StudioBaselineHelper.ts index ab13dee2b1..89f31fc7f8 100644 --- a/packages/job-worker/src/studio/model/StudioBaselineHelper.ts +++ b/packages/job-worker/src/studio/model/StudioBaselineHelper.ts @@ -1,21 +1,24 @@ import { JobContext } from '../../jobs/index.js' import { + ExpectedPackageDB, ExpectedPackageDBType, - ExpectedPackageIngestSource, ExpectedPackageIngestSourceStudioBaseline, - getExpectedPackageIdFromIngestSource, + getExpectedPackageIdNew, } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages' import { ExpectedPlayoutItemStudio } from '@sofie-automation/corelib/dist/dataModel/ExpectedPlayoutItem' import { saveIntoDb } from '../../db/changes.js' import { ExpectedPackage } from '@sofie-automation/blueprints-integration' import type { IngestExpectedPackage } from '../../ingest/model/IngestExpectedPackage.js' -import { setDefaultIdOnExpectedPackages } from '../../ingest/expectedPackages.js' -import { writeExpectedPackagesChangesForRundown } from '../../ingest/model/implementation/SaveIngestModel.js' +import { sanitiseExpectedPackages } from '../../ingest/expectedPackages.js' +import { ExpectedPackageId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { Complete } from '@sofie-automation/corelib/dist/lib' export class StudioBaselineHelper { readonly #context: JobContext - #pendingExpectedPackages: IngestExpectedPackage[] | undefined + #pendingExpectedPackages: + | Map> + | undefined #pendingExpectedPlayoutItems: ExpectedPlayoutItemStudio[] | undefined constructor(context: JobContext) { @@ -27,20 +30,25 @@ export class StudioBaselineHelper { } setExpectedPackages(packages: ExpectedPackage.Any[]): void { - const source: ExpectedPackageIngestSource = { fromPieceType: ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS } + sanitiseExpectedPackages(packages) - setDefaultIdOnExpectedPackages(packages) + // Using a map here is a bit excessive, but it makes it easier to remove duplicates + this.#pendingExpectedPackages = new Map() + for (const expectedPackage of packages) { + const id = getExpectedPackageIdNew(this.#context.studioId, expectedPackage) - this.#pendingExpectedPackages = packages.map( - (expectedPackage) => - ({ - _id: getExpectedPackageIdFromIngestSource(this.#context.studioId, source, expectedPackage._id), + this.#pendingExpectedPackages.set(id, { + packageId: id, - package: expectedPackage, + package: expectedPackage, - source: source, - }) satisfies IngestExpectedPackage - ) + source: { + fromPieceType: ExpectedPackageDBType.STUDIO_BASELINE_OBJECTS, + blueprintPackageId: expectedPackage._id, + listenToPackageInfoUpdates: expectedPackage.listenToPackageInfoUpdates, + }, + } satisfies IngestExpectedPackage) + } } setExpectedPlayoutItems(playoutItems: ExpectedPlayoutItemStudio[]): void { this.#pendingExpectedPlayoutItems = playoutItems @@ -57,7 +65,36 @@ export class StudioBaselineHelper { ) : undefined, this.#pendingExpectedPackages - ? writeExpectedPackagesChangesForRundown(this.#context, null, this.#pendingExpectedPackages) + ? // We can be naive here, as we know the packages will have only one owner (the studio baseline) + saveIntoDb( + this.#context, + this.#context.directCollections.ExpectedPackages, + { + studioId: this.#context.studioId, + rundownId: null, + bucketId: null, + }, + Array.from(this.#pendingExpectedPackages.values()).map( + (pkg) => + ({ + _id: pkg.packageId, + studioId: this.#context.studioId, + rundownId: null, + bucketId: null, + + created: Date.now(), + package: pkg.package, + ingestSources: [pkg.source], + }) satisfies Complete + ), + { + beforeDiff: (doc, oldDoc) => { + // Preserve the created date + doc.created = oldDoc.created + return doc + }, + } + ) : undefined, ]) diff --git a/packages/shared-lib/src/package-manager/package.ts b/packages/shared-lib/src/package-manager/package.ts index 5c77a8fd96..bc877fc7b0 100644 --- a/packages/shared-lib/src/package-manager/package.ts +++ b/packages/shared-lib/src/package-manager/package.ts @@ -32,7 +32,7 @@ export namespace ExpectedPackage { /** Generic (used in extends) */ export interface Base { - /** Unique id of the expectedPackage */ + /** Unique id within the blueprints for the rundown */ _id: ExpectedPackageId /** Reference to which timeline-layer(s) the Package is going to be used in. * (Used to route the package to the right playout-device (targets)) diff --git a/packages/webui/src/client/ui/Status/package-status/PackageStatus.tsx b/packages/webui/src/client/ui/Status/package-status/PackageStatus.tsx index 37b2be42cf..84471ca119 100644 --- a/packages/webui/src/client/ui/Status/package-status/PackageStatus.tsx +++ b/packages/webui/src/client/ui/Status/package-status/PackageStatus.tsx @@ -24,7 +24,7 @@ export const PackageStatus: React.FC<{ const { t } = useTranslation() const getPackageName = useCallback((): string => { - const p2 = props.package.package + const p2 = props.package.package as ExpectedPackage.Any if (p2.type === ExpectedPackage.PackageType.MEDIA_FILE) { return p2.content.filePath || unprotectString(props.package._id) } else if (p2.type === ExpectedPackage.PackageType.QUANTEL_CLIP) {