-
Notifications
You must be signed in to change notification settings - Fork 968
Bulk helper #1560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Bulk helper #1560
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
3398d2d
Bulk helper
miguelgrinberg 52a28e4
explicit flush action
miguelgrinberg e846f90
documentation
miguelgrinberg 39075ab
more docs improvements
miguelgrinberg a891cec
Merge branch 'main' into bulk-helper
miguelgrinberg b57901a
return status and error information
miguelgrinberg e51fe6c
update yaml tests to use 9.4 branch
miguelgrinberg 04c12b4
typos
miguelgrinberg b767474
more docs improvements
miguelgrinberg 2d8ed60
Merge branch 'main' into bulk-helper
miguelgrinberg 37b512b
restore main branch in yaml tests
miguelgrinberg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| # Bulk ingest [bulk-ingest] | ||
|
|
||
| The Bulk API can be cumbersome to use directly, due to its payload formatting | ||
| requirements. The PHP client includes a bulk helper function with a simplified | ||
| interface. | ||
|
|
||
| With the bulk helper, the application has to provide a generator or iterable | ||
| that yields the individual bulk actions. The helper then formats and submits | ||
| the bulk request, optionally splitting the data into chunks based on maximum | ||
| number of actions or payload size. | ||
|
|
||
| The following example submits a bulk request that indexes three documents: | ||
|
|
||
| ```php | ||
| use Elastic\Elasticsearch\Helper\Bulk; | ||
|
|
||
| function get_next_document() | ||
| { | ||
| yield Bulk::index_action([ | ||
| 'title' => 'document 1 title', | ||
| 'body' => 'document 1 body', | ||
| ]); | ||
| yield Bulk::index_action([ | ||
| 'title' => 'document 2 title', | ||
| 'body' => 'document 2 body', | ||
| ]); | ||
| yield Bulk::index_action([ | ||
| 'title' => 'document 3 title', | ||
| 'body' => 'document 3 body', | ||
| ]); | ||
| } | ||
|
|
||
| Bulk::bulk($client, 'my_index', get_next_document()) | ||
| ``` | ||
|
|
||
| ## The bulk helper function | ||
|
|
||
| The bulk helper can be called as follows: | ||
|
|
||
| ```php | ||
| use Elastic\Elasticsearch\Helper\Bulk; | ||
|
|
||
| $response = Bulk::bulk($client, $index, $actions, $stats_only = false, $chunk_size = 500, $max_chunk_bytes = 100 * 1024 * 1024); | ||
| ``` | ||
|
|
||
| This function has three required arguments: | ||
|
|
||
| - `$client` is the client to use to submit Bulk API requests, | ||
| - `$index` is the default index that actions will be applied to, | ||
| - `$actions` is the iterable that yields the bulk actions, normally implemented | ||
| as a generator. | ||
|
|
||
| The `$stats_only` optional argument controls whether details of each individual | ||
| operation are included in the response. When this argument is set to `true`, | ||
| these details are omitted. The default is `false`. | ||
|
|
||
| The two optional arguments `$chunk_size` and`$max_chunk_bytes` determine how | ||
| often Bulk API requests are issued. The helper stores actions in memory and | ||
| only submits a Bulk API request when the action count reaches `$chunk_size` or | ||
| the payload size reaches `$max_chunk_bytes`, whichever happens first. The | ||
| application can also trigger an explicit Bulk API request to be issued by | ||
| yielding a `flush` action from its generator. | ||
|
|
||
| The return value of the `bulk()` function is an array with three elements: | ||
|
|
||
| - The total number of actions that were processed | ||
| - The count of errors | ||
| - An array with the status of each operation, as returned by the bulk API. This | ||
| array is omitted when the `$stats_only` argument is set to `true`. | ||
|
|
||
| ## Bulk actions | ||
|
|
||
| A Bulk API request includes a list of operations, called *actions*. The Bulk | ||
| API supports four different actions: `index`, `create`, `update` and`delete`. | ||
| A `flush` action that is specific to this helper is also available. | ||
|
|
||
| ### Index | ||
|
|
||
| The `index` action indexes the specified document. If the document already | ||
| exists, it replaces it and increments the version. | ||
|
|
||
| ```php | ||
| yield Bulk::index_action($document, $id = null, $other_metadata = null); | ||
| ``` | ||
|
|
||
| The `$document` argument is an array with the document to index. The document's | ||
| unique ID can be passed in the `$id` argument, if desired. When `$id` is not | ||
| provided, the server generates a unique document ID. Any other attributes of | ||
| the index action can be passed in the `$other_metadata` argument. | ||
|
|
||
| The following example indexes a document with ID `123`: | ||
|
|
||
| ```php | ||
| yield Bulk::index_action([ | ||
| 'field1' => 'value1', | ||
| ], '123'); | ||
| ``` | ||
|
|
||
| The next example explicitly names the index that the action applies to: | ||
|
|
||
| ```php | ||
| yield Bulk::index_action([ | ||
| 'field1' => 'value1', | ||
| ], '123', ['_index' => 'some-other-index']); | ||
| ``` | ||
|
|
||
| ### Create | ||
|
|
||
| The `create` action indexes the specified document if it does not already | ||
| exist. | ||
|
|
||
| ```php | ||
| yield Bulk::create_action($document, $id = null, $other_metadata = null); | ||
| ``` | ||
|
|
||
| The `$document` argument is an array with the document to create. The | ||
| document's unique ID can be passed in the `$id` argument, if desired. When | ||
| `$id` is not provided, the server generates a unique document ID. Any other | ||
| attributes of the index action can be passed in the `$other_metadata` argument. | ||
|
|
||
| The following example creates a document: | ||
|
|
||
| ```php | ||
| yield Bulk::create_action([ | ||
| 'field1' => 'value1', | ||
| ]); | ||
| ``` | ||
|
|
||
| ### Update | ||
|
|
||
| The `update` action performs a partial document update. | ||
|
|
||
| ```php | ||
| yield Bulk::update_action($document_updates, $id, $other_metadata = null); | ||
| ``` | ||
|
|
||
| The `$document_updates` argument is an array with the desired updates to the | ||
| document, formatted as required by the Bulk API | ||
| [update action](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk#operation-bulk-body-application-json-updateaction-object). | ||
| The `$id` argument is the ID of the document to update. Any other attributes of | ||
| the index action can be passed in the `$other_metadata` argument. | ||
|
|
||
| The following example updates the value of field `field2` in the document with | ||
| ID `123`: | ||
|
|
||
| ```php | ||
| yield Bulk::update_action([ | ||
| 'doc' => [ | ||
| 'field2' => 'value2', | ||
| ], | ||
| ], '123'); | ||
| ``` | ||
|
|
||
| ### Delete | ||
|
|
||
| The `delete` action removes the specified document from the index. | ||
|
|
||
| ```php | ||
| yield Bulk::delete_action($id, $other_metadata = null); | ||
| ``` | ||
|
|
||
| The `$id` argument is the ID of the document delete. Any other attributes of | ||
| the index action can be passed in the `$other_metadata` argument. | ||
|
|
||
| The following example deletes the document with ID `123`: | ||
|
|
||
| ```php | ||
| yield Bulk::delete_action('123'); | ||
| ``` | ||
|
|
||
| ### Flush | ||
|
|
||
| The `flush` action instructs the bulk helper to submit a Bulk API request with | ||
| all the actions accumulated up to that point. This allows the application to | ||
| override the logic that decides when to submit in a Bulk API request based on | ||
| count of actions or payload size. | ||
|
|
||
| ```php | ||
| yield Bulk::flush_action(); | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,4 +24,5 @@ toc: | |
| - file: client-helpers.md | ||
| children: | ||
| - file: iterators.md | ||
| - file: esql.md | ||
| - file: esql.md | ||
| - file: bulk.md | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| <?php | ||
| /** | ||
| * Elasticsearch PHP Client | ||
| * | ||
| * @link https://github.com/elastic/elasticsearch-php | ||
| * @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co) | ||
| * @license https://opensource.org/licenses/MIT MIT License | ||
| * | ||
| * Licensed to Elasticsearch B.V under one or more agreements. | ||
| * Elasticsearch B.V licenses this file to you under the MIT License. | ||
| * See the LICENSE file in the project root for more information. | ||
| */ | ||
| declare(strict_types = 1); | ||
| ini_set('memory_limit', '1024M'); | ||
|
|
||
| require_once __DIR__ . '/../vendor/autoload.php'; | ||
|
|
||
| use Elastic\Elasticsearch\Helper\Bulk; | ||
| use Elastic\Elasticsearch\Helper\Vectors; | ||
|
|
||
| $ELASTICSEARCH_URL = ''; | ||
| $chunk_sizes = array(100, 250, 500, 1000); | ||
| $repetitions = 20; | ||
| $json_output = FALSE; | ||
| $runs = 3; | ||
| $dataset_file = ''; | ||
| $rest_index = null; | ||
| $dataset = []; | ||
| $index = 'benchmark'; | ||
|
|
||
| function get_next_document($dataset, $repetitions, $packed) { | ||
| $len = sizeof($dataset); | ||
| for ($i = 1; $i <= $len * $repetitions; $i++) { | ||
| $doc = $dataset[($i - 1) % $len]; | ||
| yield Bulk::index_action([ | ||
| 'docid' => $doc['docid'], | ||
| 'title' => $doc['title'], | ||
| 'text' => $doc['text'], | ||
| 'emb' => $packed ? Vectors::packDenseVector($doc['emb']) : $doc['emb'] | ||
| ]); | ||
| } | ||
| } | ||
|
|
||
| function upload($client, $index, $dataset, $chunk_size, $repetitions, $packed) { | ||
| // create index | ||
| if ($client->indices()->exists(['index' => $index])->getStatusCode() != 404) { | ||
| $client->indices()->delete(['index' => $index]); | ||
| } | ||
| $client->indices()->create([ | ||
| 'index' => $index, | ||
| 'body' => [ | ||
| 'mappings' => [ | ||
| 'properties' => [ | ||
| 'docid' => [ | ||
| 'type' => 'keyword', | ||
| ], | ||
| 'title' => [ | ||
| 'type' => 'text', | ||
| ], | ||
| 'text' => [ | ||
| 'type' => 'text', | ||
| ], | ||
| 'emb' => [ | ||
| 'type' => 'dense_vector', | ||
| 'index_options' => [ | ||
| 'type' => 'flat', | ||
| ], | ||
| ], | ||
| ], | ||
| ], | ||
| ], | ||
| ]); | ||
| $client->indices()->refresh(['index' => $index]); | ||
|
|
||
| // run the bulk upload | ||
| $len = sizeof($dataset); | ||
| $body = []; | ||
| $start = microtime(true); | ||
| $response = Bulk::bulk($client, $index, get_next_document($dataset, $repetitions, $packed), true, $chunk_size); | ||
| assert ($response[0] == sizeof($dataset)); // make sure all items were ingested | ||
| assert ($response[1] == 0); // make sure there were no errors | ||
| return microtime(true) - $start; | ||
| } | ||
|
|
||
| $opts = getopt('s:r:', array('url:', 'json', 'runs:', 'help'), $rest_index); | ||
| if (array_key_exists('help', $opts)) { | ||
| echo "Usage: " . $argv[0] . "[-s CHUNK_SIZES] [-r REPETITIONS] [--url URL] [--json] [--runs RUNS] DATASET_FILE\n"; | ||
| echo " -s CHUNK_SIZES List of chunk sizes to use, separated by commas (default: 100,250,500,1000)\n"; | ||
| echo " -r REPETITIONS Number of times the dataset is repeated (default: 20)\n"; | ||
| echo " --url URL The Elasticsearch connection URL\n"; | ||
| echo " --json Output benchmark results in JSON format\n"; | ||
| echo " --runs Number of runs that are averaged for each chunk size (default: 3)\n"; | ||
| exit(0); | ||
| } | ||
| if (!array_key_exists('url', $opts)) { | ||
| echo 'Error: --url argument is required.'; | ||
| exit(1); | ||
| } | ||
| else { | ||
| $ELASTICSEARCH_URL = $opts['url']; | ||
| } | ||
| if (array_key_exists('s', $opts)) { | ||
| $chunk_sizes = array_map(fn($v) => intval($v), explode(',', $opts['s'])); | ||
| } | ||
| if (array_key_exists('r', $opts)) { | ||
| $repetitions = intval($opts['r']); | ||
| } | ||
| if (array_key_exists('json', $opts)) { | ||
| $json_output = TRUE; | ||
| } | ||
| if (array_key_exists('runs', $opts)) { | ||
| $runs = intval($opts['runs']); | ||
| } | ||
| if (!$argv[$rest_index]) { | ||
| echo 'Error'; | ||
| exit(1); | ||
| } | ||
| else { | ||
| $dataset_file = $argv[$rest_index]; | ||
| } | ||
|
|
||
| // read CSV dataset | ||
| $f = fopen($dataset_file, 'rt'); | ||
| while (!feof($f)) { | ||
| $line = fgets($f); | ||
| if ($line !== FALSE) { | ||
| $dataset[] = json_decode($line, true); | ||
| } | ||
| } | ||
| fclose($f); | ||
|
|
||
| // initialize client | ||
| $client = Elastic\Elasticsearch\ClientBuilder::create() | ||
| ->setHosts([$ELASTICSEARCH_URL]) | ||
| ->build(); | ||
|
|
||
| // run the benchmark | ||
| $results = []; | ||
| foreach ($chunk_sizes as $chunk_size) { | ||
| if (!$json_output) { | ||
| echo 'Uploading ' . $dataset_file . ' with chunk size ' . $chunk_size . "...\n"; | ||
| } | ||
| $normal_runs = []; | ||
| $packed_runs = []; | ||
| for ($run = 0; $run < $runs; $run++) { | ||
| $normal_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, FALSE); | ||
| $packed_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, TRUE); | ||
| } | ||
| $t = array_sum($normal_runs) / $runs; | ||
| $pt = array_sum($packed_runs) / $runs; | ||
| $result = [ | ||
| 'dataset_size' => sizeof($dataset) * $repetitions, | ||
| 'chunk_size' => $chunk_size, | ||
| 'float32' => [ | ||
| 'duration' => intval($t * 1000 + 0.5), | ||
| ], | ||
| 'base64' => [ | ||
| 'duration' => intval($pt * 1000 + 0.5), | ||
| ], | ||
| ]; | ||
| $results[] = $result; | ||
| if (!$json_output) { | ||
| echo 'Size: ' . $result['dataset_size'] . "\n"; | ||
| echo 'float duration: ' . number_format($t, 2) . 's (' . number_format($result['dataset_size'] / $t, 2) . " docs/s)\n"; | ||
| echo 'base64 duration: ' . number_format($pt, 2) . 's (' . number_format($result['dataset_size'] / $pt, 2) . " docs/s)\n"; | ||
| echo 'Speed up: ' . number_format($t / $pt, 2) . "x\n"; | ||
| } | ||
| } | ||
|
|
||
| if ($json_output) { | ||
| echo json_encode($results, JSON_PRETTY_PRINT); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.