Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/platform/packages/shared/kbn-storage-adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,21 @@ export type StorageClientBulkOperation<TDocument extends { _id?: string }> =
}
| { delete: { _id: string } };

export interface StorageClientBulkOptions {
/**
* If true, throws BulkOperationError when any operation in the bulk request fails.
* If false (default), returns the response with errors field populated, similar to Promise.allSettled behavior.
* @default false
*/
throwOnFail?: boolean;
}

export type StorageClientBulkRequest<TDocument extends { _id?: string }> = Omit<
BulkRequest,
'operations' | 'index'
> & {
operations: Array<StorageClientBulkOperation<TDocument>>;
};
} & StorageClientBulkOptions;
export type StorageClientBulkResponse = BulkResponse;

export type StorageClientDeleteRequest = Omit<DeleteRequest, 'index'>;
Expand Down Expand Up @@ -91,6 +100,13 @@ export type StorageClientSearch<TDocumentType = never> = <
request: TSearchRequest
) => Promise<StorageClientSearchResponse<TDocumentType, TSearchRequest>>;

/**
* Performs bulk operations on documents.
*
* By default, behaves similar to Promise.allSettled - individual operation failures
* are returned in the response without throwing an error. Set `throwOnFail: true`
* to throw a BulkOperationError when any operation fails.
*/
export type StorageClientBulk<TDocumentType extends { _id?: string } = never> = (
request: StorageClientBulkRequest<TDocumentType>
) => Promise<StorageClientBulkResponse>;
Expand Down Expand Up @@ -159,4 +175,6 @@ export type StorageDocumentOf<TStorageSettings extends StorageSettings> = Partia

export { StorageIndexAdapter } from './src/index_adapter';

export { BulkOperationError } from './src/errors';

export { types } from './types';
17 changes: 17 additions & 0 deletions src/platform/packages/shared/kbn-storage-adapter/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { BulkResponse } from '@elastic/elasticsearch/lib/api/types';

export class BulkOperationError extends Error {
constructor(message: string, public response: BulkResponse) {
super(message);
this.name = 'BulkOperationError';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import type {
BulkOperationContainer,
BulkOperationType,
IndexResponse,
IndicesIndexState,
IndicesIndexTemplate,
Expand Down Expand Up @@ -40,6 +41,7 @@ import type {
} from '../..';
import { getSchemaVersion } from '../get_schema_version';
import type { StorageMappingProperty } from '../../types';
import { BulkOperationError } from '../errors';

function getAliasName(name: string) {
return name;
Expand Down Expand Up @@ -411,6 +413,7 @@ export class StorageIndexAdapter<
private bulk: StorageClientBulk<TApplicationType> = ({
operations,
refresh = 'wait_for',
throwOnFail = false,
...request
}): Promise<StorageClientBulkResponse> => {
if (operations.length === 0) {
Expand Down Expand Up @@ -475,6 +478,21 @@ export class StorageIndexAdapter<
};

return this.validateComponentsBeforeWriting(attemptBulk).then(async (response) => {
// Check for errors and throw if throwOnFail is true
if (throwOnFail) {
const erroredItems = response.items.filter((item) => {
const operation = Object.keys(item)[0] as BulkOperationType;
return item[operation]?.error;
});
if (erroredItems.length > 0) {
throw new BulkOperationError(
`Bulk operation failed for ${erroredItems.length} out of ${
response.items.length
} items: ${JSON.stringify(erroredItems)}`,
response
);
}
}
return response;
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import type {
StorageClientIndexResponse,
StorageDocumentOf,
} from '../../..';
import { StorageIndexAdapter, type StorageSettings } from '../../..';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import { httpServerMock } from '@kbn/core/server/mocks';
import { BulkOperationError, StorageIndexAdapter, type StorageSettings } from '../../..';
import * as getSchemaVersionModule from '../../get_schema_version';
import { isResponseError } from '@kbn/es-errors';
import type { IndicesGetResponse } from '@elastic/elasticsearch/lib/api/types';
Expand Down Expand Up @@ -389,6 +389,103 @@ describe('StorageIndexAdapter', () => {
});
});

describe('when bulk operation encounters errors', () => {
beforeAll(async () => {
await createServers();
});

afterAll(async () => {
await stopServers();
});

it('throws BulkOperationError when bulk operation contains document-level errors', async () => {
// Create an adapter with strict mapping to trigger mapping errors
const strictAdapter = createStorageIndexAdapter({
name: 'test_strict_index',
schema: {
properties: {
foo: {
type: 'keyword',
},
},
},
});
const strictClient = strictAdapter.getClient();

// First create the index with strict mapping
await strictClient.index({
id: 'doc1',
document: { foo: 'bar' },
});

// Try to bulk index with an invalid field (should fail due to dynamic: strict)
await expect(
strictClient.bulk({
operations: [
{
index: {
_id: 'doc2',
document: { foo: 'baz', invalid_field: 'value' } as any,
},
},
],
refresh: 'wait_for',
throwOnFail: true,
})
).rejects.toThrow(BulkOperationError);

await strictClient.clean();
});

it('includes error details in the thrown BulkOperationError', async () => {
const strictAdapter = createStorageIndexAdapter({
name: 'test_strict_index_2',
schema: {
properties: {
foo: {
type: 'keyword',
},
},
},
});
const strictClient = strictAdapter.getClient();

// Create the index
await strictClient.index({
id: 'doc1',
document: { foo: 'bar' },
});

try {
await strictClient.bulk({
operations: [
{
index: {
_id: 'doc2',
document: { foo: 'baz', invalid_field: 'value' } as any,
},
},
],
throwOnFail: true,
refresh: 'wait_for',
});
fail('Expected BulkOperationError to be thrown');
} catch (err) {
const error = err as BulkOperationError;
expect(error).toBeInstanceOf(BulkOperationError);
expect(error.message).toContain('Bulk operation failed');
expect(error.message).toContain('1 out of 1 items');
expect(error.response).toBeDefined();
expect(error.response.errors).toBe(true);
expect(error.response.items).toHaveLength(1);
expect(error.response.items[0].index?.error).toBeDefined();
expect(error.response.items[0].index?.error?.type).toBe('strict_dynamic_mapping_exception');
}

await strictClient.clean();
});
});

describe('when writing/bootstrapping with an legacy index', () => {
beforeAll(async () => {
await createServers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ export class AssetClient {
},
};
}),
throwOnFail: true,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export class FeatureClient {
},
};
}),
throwOnFail: true,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ export class ExecutionPlan {
return this.dependencies.storageClient.bulk({
operations: actions.map(dotDocumentActionToBulkOperation),
refresh: true,
throwOnFail: true,
});
}

Expand Down