Skip to content

Commit 4e7c4f9

Browse files
authored
Storage adapter: Propagate bulk errors (#242901)
Currently, the `bulk` method of the storage adapter is not failing if one of the operations fails. This can hide problems, e.g. if saving a stream would cause the `create` operation in the bulk call to fail, it would fail silently and tell the user saving worked fine. With the change on this PR, the error is passed back to the user correctly (had to mess with the mappings of the index to provoke an error, not sure whether it can be triggered accidentally): <img width="309" height="492" alt="Screenshot 2025-11-13 at 14 08 36" src="https://github.com/user-attachments/assets/aaba6185-23e3-475f-9545-fd595b5557e2" /> This is a breaking change of the storage adapter interface, but judging from the current usages we don't want this kind of error silently being ignored - throwing an error makes sure the problem is visible.
1 parent 38f89e0 commit 4e7c4f9

File tree

8 files changed

+152
-2
lines changed

8 files changed

+152
-2
lines changed

src/platform/packages/shared/kbn-storage-adapter/index.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,21 @@ export type StorageClientBulkOperation<TDocument extends { _id?: string }> =
5555
}
5656
| { delete: { _id: string } };
5757

58+
export interface StorageClientBulkOptions {
59+
/**
60+
* If true, throws BulkOperationError when any operation in the bulk request fails.
61+
* If false (default), returns the response with errors field populated, similar to Promise.allSettled behavior.
62+
* @default false
63+
*/
64+
throwOnFail?: boolean;
65+
}
66+
5867
export type StorageClientBulkRequest<TDocument extends { _id?: string }> = Omit<
5968
BulkRequest,
6069
'operations' | 'index'
6170
> & {
6271
operations: Array<StorageClientBulkOperation<TDocument>>;
63-
};
72+
} & StorageClientBulkOptions;
6473
export type StorageClientBulkResponse = BulkResponse;
6574

6675
export type StorageClientDeleteRequest = Omit<DeleteRequest, 'index'>;
@@ -91,6 +100,13 @@ export type StorageClientSearch<TDocumentType = never> = <
91100
request: TSearchRequest
92101
) => Promise<StorageClientSearchResponse<TDocumentType, TSearchRequest>>;
93102

103+
/**
104+
* Performs bulk operations on documents.
105+
*
106+
* By default, behaves similar to Promise.allSettled - individual operation failures
107+
* are returned in the response without throwing an error. Set `throwOnFail: true`
108+
* to throw a BulkOperationError when any operation fails.
109+
*/
94110
export type StorageClientBulk<TDocumentType extends { _id?: string } = never> = (
95111
request: StorageClientBulkRequest<TDocumentType>
96112
) => Promise<StorageClientBulkResponse>;
@@ -159,4 +175,6 @@ export type StorageDocumentOf<TStorageSettings extends StorageSettings> = Partia
159175

160176
export { StorageIndexAdapter } from './src/index_adapter';
161177

178+
export { BulkOperationError } from './src/errors';
179+
162180
export { types } from './types';
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import type { BulkResponse } from '@elastic/elasticsearch/lib/api/types';
11+
12+
export class BulkOperationError extends Error {
13+
constructor(message: string, public response: BulkResponse) {
14+
super(message);
15+
this.name = 'BulkOperationError';
16+
}
17+
}

src/platform/packages/shared/kbn-storage-adapter/src/index_adapter/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import type {
1111
BulkOperationContainer,
12+
BulkOperationType,
1213
IndexResponse,
1314
IndicesIndexState,
1415
IndicesIndexTemplate,
@@ -40,6 +41,7 @@ import type {
4041
} from '../..';
4142
import { getSchemaVersion } from '../get_schema_version';
4243
import type { StorageMappingProperty } from '../../types';
44+
import { BulkOperationError } from '../errors';
4345

4446
function getAliasName(name: string) {
4547
return name;
@@ -351,6 +353,7 @@ export class StorageIndexAdapter<
351353
private bulk: StorageClientBulk<TApplicationType> = ({
352354
operations,
353355
refresh = 'wait_for',
356+
throwOnFail = false,
354357
...request
355358
}): Promise<StorageClientBulkResponse> => {
356359
if (operations.length === 0) {
@@ -393,6 +396,21 @@ export class StorageIndexAdapter<
393396
};
394397

395398
return this.validateComponentsBeforeWriting(attemptBulk).then(async (response) => {
399+
// Check for errors and throw if throwOnFail is true
400+
if (throwOnFail) {
401+
const erroredItems = response.items.filter((item) => {
402+
const operation = Object.keys(item)[0] as BulkOperationType;
403+
return item[operation]?.error;
404+
});
405+
if (erroredItems.length > 0) {
406+
throw new BulkOperationError(
407+
`Bulk operation failed for ${erroredItems.length} out of ${
408+
response.items.length
409+
} items: ${JSON.stringify(erroredItems)}`,
410+
response
411+
);
412+
}
413+
}
396414
return response;
397415
});
398416
};

src/platform/packages/shared/kbn-storage-adapter/src/index_adapter/integration_tests/index.test.ts

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import type {
1414
StorageClientIndexResponse,
1515
StorageDocumentOf,
1616
} from '../../..';
17-
import { StorageIndexAdapter, type StorageSettings } from '../../..';
17+
import { BulkOperationError, StorageIndexAdapter, type StorageSettings } from '../../..';
1818
import type { Logger } from '@kbn/core/server';
1919
import * as getSchemaVersionModule from '../../get_schema_version';
2020
import { isResponseError } from '@kbn/es-errors';
@@ -348,6 +348,99 @@ describe('StorageIndexAdapter', () => {
348348
});
349349
});
350350

351+
describe('when bulk operation encounters errors', () => {
352+
afterAll(async () => {
353+
await client?.clean();
354+
});
355+
356+
it('throws BulkOperationError when bulk operation contains document-level errors', async () => {
357+
// Create an adapter with strict mapping to trigger mapping errors
358+
const strictAdapter = createStorageIndexAdapter({
359+
name: 'test_strict_index',
360+
schema: {
361+
properties: {
362+
foo: {
363+
type: 'keyword',
364+
},
365+
},
366+
},
367+
});
368+
const strictClient = strictAdapter.getClient();
369+
370+
// First create the index with strict mapping
371+
await strictClient.index({
372+
id: 'doc1',
373+
document: { foo: 'bar' },
374+
});
375+
376+
// Try to bulk index with an invalid field (should fail due to dynamic: strict)
377+
await expect(
378+
strictClient.bulk({
379+
operations: [
380+
{
381+
index: {
382+
_id: 'doc2',
383+
document: { foo: 'baz', invalid_field: 'value' } as any,
384+
},
385+
},
386+
],
387+
refresh: 'wait_for',
388+
throwOnFail: true,
389+
})
390+
).rejects.toThrow(BulkOperationError);
391+
392+
await strictClient.clean();
393+
});
394+
395+
it('includes error details in the thrown BulkOperationError', async () => {
396+
const strictAdapter = createStorageIndexAdapter({
397+
name: 'test_strict_index_2',
398+
schema: {
399+
properties: {
400+
foo: {
401+
type: 'keyword',
402+
},
403+
},
404+
},
405+
});
406+
const strictClient = strictAdapter.getClient();
407+
408+
// Create the index
409+
await strictClient.index({
410+
id: 'doc1',
411+
document: { foo: 'bar' },
412+
});
413+
414+
try {
415+
await strictClient.bulk({
416+
operations: [
417+
{
418+
index: {
419+
_id: 'doc2',
420+
document: { foo: 'baz', invalid_field: 'value' } as any,
421+
},
422+
},
423+
],
424+
throwOnFail: true,
425+
refresh: 'wait_for',
426+
});
427+
fail('Expected BulkOperationError to be thrown');
428+
} catch (err) {
429+
const error = err as BulkOperationError;
430+
expect(error).toBeInstanceOf(BulkOperationError);
431+
expect(error.message).toContain('Bulk operation failed');
432+
expect(error.message).toContain('1 out of 1 items');
433+
expect(error.response).toBeDefined();
434+
expect(error.response.errors).toBe(true);
435+
expect(error.response.items).toHaveLength(1);
436+
expect(error.response.items[0].index?.error).toBeDefined();
437+
expect(error.response.items[0].index?.error?.type).toBe('strict_dynamic_mapping_exception');
438+
}
439+
440+
await strictClient.clean();
441+
});
442+
});
443+
351444
describe('when writing/bootstrapping with an existing, incompatible index', () => {
352445
beforeAll(async () => {
353446
await client.index({ id: 'foo', document: { foo: 'bar' } });

x-pack/platform/plugins/shared/streams/server/lib/streams/assets/asset_client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ export class AssetClient {
279279
},
280280
};
281281
}),
282+
throwOnFail: true,
282283
});
283284
}
284285

x-pack/platform/plugins/shared/streams/server/lib/streams/attachments/attachment_client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ export class AttachmentClient {
386386

387387
const bulkResponse = await this.clients.storageClient.bulk({
388388
operations: bulkOperations,
389+
throwOnFail: true,
389390
});
390391

391392
return { errors: bulkResponse.errors, items: bulkResponse.items };

x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ export class FeatureClient {
150150
},
151151
};
152152
}),
153+
throwOnFail: true,
153154
});
154155
}
155156

x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ export class ExecutionPlan {
424424
return this.dependencies.storageClient.bulk({
425425
operations: actions.map(dotDocumentActionToBulkOperation),
426426
refresh: true,
427+
throwOnFail: true,
427428
});
428429
}
429430

0 commit comments

Comments
 (0)