Skip to content

Commit b57901a

Browse files
return status and error information
1 parent a891cec commit b57901a

4 files changed

Lines changed: 68 additions & 30 deletions

File tree

docs/reference/bulk.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ The bulk helper can be called as follows:
4040
```php
4141
use Elastic\Elasticsearch\Helper\Bulk;
4242

43-
$count = Bulk::bulk($client, $index, $actions, $chunk_size = 500, $max_chunk_bytes = 100 * 1024 * 1024);
43+
$response = Bulk::bulk($client, $index, $actions, $statS_only = false, $chunk_size = 500, $max_chunk_bytes = 100 * 1024 * 1024);
4444
```
4545

4646
This function has three required arguments:
@@ -50,15 +50,23 @@ This function has three required arguments:
5050
- `$actions` is the iterable that yields the bulk actions, normally implemented
5151
as a generator.
5252

53+
The `$stats_only` optional argument controls wether details of each individual
54+
operation are included in the response. A value of `true` can be passed to only
55+
return total amount of operations processed and count of errors.
56+
5357
The two optional arguments, `$chunk_size` and`$max_chunk_bytes`, determine when
5458
a Bulk API request is issued. The helper accumulates actions and submits a Bulk
5559
API request when the action count reaches `$chunk_size` or the payload size
5660
reaches `$max_chunk_bytes`, whichever happens first. The application can
5761
trigger a Bulk API request to be sent at specific times by yielding a `flush`
5862
action from its generator.
5963

60-
The return value of the `bulk()` function is the total number of actions
61-
successfully processed.
64+
The return value of the `bulk()` function is an array with three elements:
65+
66+
- The total number of actions successfully processed
67+
- The count of errors
68+
- An array with the details of each operation, as returned by the bulk API. Note that
69+
this array is omitted when the `$stats_only` argument is set to `true`.
6270

6371
## Bulk actions
6472

examples/dense_vector_benchmark.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,20 @@ function upload($client, $index, $dataset, $chunk_size, $repetitions, $packed) {
7676
$len = sizeof($dataset);
7777
$body = [];
7878
$start = microtime(true);
79-
Bulk::bulk($client, $index, get_next_document($dataset, $repetitions, $packed), $chunk_size);
79+
$response = Bulk::bulk($client, $index, get_next_document($dataset, $repetitions, $packed), true, $chunk_size);
80+
assert ($response[0] == sizeof($dataset)); // make sure all items were ingested
81+
assert ($response[1] == 0); // make sure there were no errors
8082
return microtime(true) - $start;
8183
}
8284

8385
$opts = getopt('s:r:', array('url:', 'json', 'runs:', 'help'), $rest_index);
8486
if (array_key_exists('help', $opts)) {
85-
echo 'Usage: ' . $argv[0] . '[-s CHUNK_SIZES] [-r REPETITIONS] [--url URL] [--json] [--runs RUNS] DATASET_FILE\n';
86-
echo ' -s CHUNK_SIZES List of chunk sizes to use, separated by commas (default: 100,250,500,1000)\n';
87-
echo ' -r REPETITIONS Number of times the dataset is repeated (default: 20)\n';
88-
echo ' --url URL The Elasticsearch connection URL\n';
89-
echo ' --json Output benchmark results in JSON format\n';
90-
echo ' --runs Number of runs that are averaged for each chunk size (default: 3)\n';
87+
echo "Usage: " . $argv[0] . "[-s CHUNK_SIZES] [-r REPETITIONS] [--url URL] [--json] [--runs RUNS] DATASET_FILE\n";
88+
echo " -s CHUNK_SIZES List of chunk sizes to use, separated by commas (default: 100,250,500,1000)\n";
89+
echo " -r REPETITIONS Number of times the dataset is repeated (default: 20)\n";
90+
echo " --url URL The Elasticsearch connection URL\n";
91+
echo " --json Output benchmark results in JSON format\n";
92+
echo " --runs Number of runs that are averaged for each chunk size (default: 3)\n";
9193
exit(0);
9294
}
9395
if (!array_key_exists('url', $opts)) {

src/Helper/Bulk.php

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,27 @@ class Bulk
2828
* @param $client Elasticsearch client
2929
* @param $defaultIndex The index that the bulk operation applies to
3030
* @param $actions Iterator that provides actions to include in the bulk requests
31+
* @param $stats_only set to true to omit the individual operation results.
3132
* @param $chunkSize the maximum number of operations in a bulk request (DEFAULT: 500)
3233
* @param $maxChunkBytes the maximum size of a bulk request (DEFAULT: 10MB)
33-
* @throws BulkHelperException
34+
* @return array<mixed> an array where the first and second elements are the
35+
* total number of operations processed and the total number of errors.
36+
* If $stats_only is false, a third element is returned, with an array of
37+
* individual operation results, as returned by the bulk API.
3438
* @throws ClientResponseException
3539
* @throws ServerResponseException
3640
*/
37-
public static function bulk(Client $client, string $index, Iterator $actions, int $chunk_size = 500, int $max_chunk_bytes = 100 * 1024 * 1024): int {
41+
public static function bulk(
42+
Client $client,
43+
string $index,
44+
Iterator $actions,
45+
bool $stats_only = false,
46+
int $chunk_size = 500,
47+
int $max_chunk_bytes = 100 * 1024 * 1024,
48+
): array {
3849
$totalCount = 0;
50+
$totalErrors = 0;
51+
$results = [];
3952
$chunkCount = 0;
4053
$chunkBytes = 0;
4154
$body = [];
@@ -51,10 +64,14 @@ public static function bulk(Client $client, string $index, Iterator $actions, in
5164
}
5265
if (count($action) == 0 || $chunkCount >= $chunk_size || $chunkBytes >= $max_chunk_bytes) {
5366
$response = $client->bulk(['index' => $index, 'body' => $body]);
54-
if ($response['errors']) {
55-
$error = new BulkHelperException('Bulk upload error');
56-
$error->setResponse($response);
57-
throw $error;
67+
foreach ($response['items'] as $item) {
68+
$status = $item[array_key_first($item)]['status'];
69+
if ($status < 200 || $status >= 300) {
70+
$totalErrors += 1;
71+
}
72+
if (!$stats_only) {
73+
$results[] = $item;
74+
}
5875
}
5976
$body = [];
6077
$chunkCount = 0;
@@ -64,15 +81,17 @@ public static function bulk(Client $client, string $index, Iterator $actions, in
6481
}
6582
if (!empty($body)) {
6683
$response = $client->bulk(['index' => $index, 'body' => $body]);
67-
if ($response['errors']) {
68-
$error = new BulkHelperException('Bulk upload error');
69-
$error->setResponse($response);
70-
throw $error;
84+
foreach ($response['items'] as $item) {
85+
$status = $item[array_key_first($item)]['status'];
86+
if ($status < 200 || $status >= 300) {
87+
$totalErrors += 1;
88+
}
89+
$results[] = $item;
7190
}
7291
unset($body);
7392
unset($response);
7493
}
75-
return $totalCount;
94+
return [$totalCount, $totalErrors, $results];
7695
}
7796

7897
private static function action(string $action, array $metadata, ?array $document): array {

tests/Integration/BulkTest.php

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,13 @@ function flushByCountActions($client, $index) {
177177
assert($response['hits']['total']['value'] == 3);
178178
}
179179

180-
$count = Bulk::bulk(
180+
$response = Bulk::bulk(
181181
$this->client, self::TEST_INDEX,
182-
flushByCountActions($this->client, self::TEST_INDEX), 2
182+
flushByCountActions($this->client, self::TEST_INDEX), true, 2
183183
);
184-
$this->assertEquals($count, 6);
184+
$this->assertEquals($response[0], 6);
185+
$this->assertEquals($response[1], 0);
186+
$this->assertEquals($response[2], []); // only stats in this response
185187

186188
$response = readIndex($this->client, self::TEST_INDEX);
187189
$this->assertEquals(200, $response->getStatusCode());
@@ -232,11 +234,15 @@ function flushBySizeActions($client, $index) {
232234
assert($response['hits']['total']['value'] == 4);
233235
}
234236

235-
$count = Bulk::bulk(
237+
$response = Bulk::bulk(
236238
$this->client, self::TEST_INDEX,
237-
flushBySizeActions($this->client, self::TEST_INDEX), 500, 40
239+
flushBySizeActions($this->client, self::TEST_INDEX),
240+
false, // include individual item results
241+
500, 40,
238242
);
239-
$this->assertEquals($count, 6);
243+
$this->assertEquals($response[0], 6);
244+
$this->assertEquals($response[1], 0);
245+
$this->assertEquals(sizeof($response[2]), 6);
240246

241247
$response = readIndex($this->client, self::TEST_INDEX);
242248
$this->assertEquals(200, $response->getStatusCode());
@@ -294,11 +300,14 @@ function explicitFlushActions($client, $index) {
294300
assert($response['hits']['total']['value'] == 3);
295301
}
296302

297-
$count = Bulk::bulk(
303+
$response = Bulk::bulk(
298304
$this->client, self::TEST_INDEX,
299-
explicitFlushActions($this->client, self::TEST_INDEX)
305+
explicitFlushActions($this->client, self::TEST_INDEX),
306+
true, // stats only
300307
);
301-
$this->assertEquals($count, 6);
308+
$this->assertEquals($response[0], 6);
309+
$this->assertEquals($response[1], 0);
310+
$this->assertEquals($response[2], []);
302311

303312
$response = readIndex($this->client, self::TEST_INDEX);
304313
$this->assertEquals(200, $response->getStatusCode());

0 commit comments

Comments
 (0)