Skip to content

Commit c5bf625

Browse files
authored
Merge pull request #160 from gregolsky/v4.0
Bulk insert performance and error handling enhancements
2 parents 8af0db3 + b2e7fd4 commit c5bf625

File tree

10 files changed

+296
-124
lines changed

10 files changed

+296
-124
lines changed

perf/bulkInsert.js

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
const {
2+
DocumentStore,
3+
DeleteDatabasesOperation,
4+
CreateDatabaseOperation,
5+
RequestExecutor
6+
} = require("../dist");
7+
const {
8+
bench,
9+
settings
10+
} = require("./common");
11+
12+
const nodbStore = new DocumentStore(settings.urls);
13+
nodbStore.initialize();
14+
15+
// RequestExecutor.requestPostProcessor = (req) => {
16+
// req.proxy = "http://127.0.0.1:8888";
17+
// };
18+
19+
const store = new DocumentStore(settings.urls, settings.database);
20+
store.initialize();
21+
22+
class Order {
23+
constructor(opts) {
24+
if (opts) {
25+
Object.assign(this, opts);
26+
}
27+
}
28+
}
29+
30+
let data;
31+
32+
(async function main() {
33+
34+
const getBenchOpts = (n) => ({
35+
async before() {
36+
const dataGen = getData();
37+
data = Object.keys(Array.apply(0,Array(n))).map(x => new Order(dataGen.next().value));
38+
try {
39+
await nodbStore.maintenance.server.send(new DeleteDatabasesOperation({
40+
databaseNames: [settings.database],
41+
hardDelete: true
42+
}));
43+
} finally {
44+
await nodbStore.maintenance.server.send(new CreateDatabaseOperation({
45+
databaseName: settings.database
46+
}));
47+
}
48+
},
49+
async after() {
50+
await nodbStore.maintenance.server.send(new DeleteDatabasesOperation({
51+
databaseNames: [settings.database],
52+
hardDelete: true
53+
}));
54+
}
55+
});
56+
57+
try {
58+
const name = "bulk-insert-2018-10-18-pipeline";
59+
await bench(name, 10, bulkInsertPipeline, getBenchOpts(1000));
60+
await bench(name, 50, bulkInsertPipeline, getBenchOpts(1000));
61+
await bench(name, 100, bulkInsertPipeline, getBenchOpts(1000));
62+
} finally {
63+
store.dispose();
64+
nodbStore.dispose();
65+
}
66+
67+
}());
68+
69+
function randomDate() {
70+
return new Date(2018, Math.floor(Math.random() * 11), Math.floor(Math.random() * 25));
71+
}
72+
73+
function randomInt(max = 100) {
74+
return Math.floor(Math.random() * max);
75+
}
76+
77+
function* getData() {
78+
let i = 1;
79+
while (true) {
80+
i++;
81+
yield new Order({
82+
"Id": "orders/" + i,
83+
"Name": "Order #" + i,
84+
"Company": "companies/58-A",
85+
"Employee": "employees/2-A",
86+
"Freight": randomInt(),
87+
"Lines": [{
88+
"Discount": 0,
89+
"PricePerUnit": randomInt(),
90+
"Product": "products/11-A",
91+
"ProductName": "Queso Cabrales",
92+
"Quantity": 10
93+
},
94+
{
95+
"Discount": 0,
96+
"PricePerUnit": 4.5,
97+
"Product": "products/24-A",
98+
"ProductName": "Guaraná Fantástica",
99+
"Quantity": randomInt()
100+
}
101+
],
102+
"OrderedAt": randomDate(),
103+
"RequireAt": randomDate(),
104+
"ShipTo": {
105+
"City": "México D.F.",
106+
"Country": "Mexico",
107+
"Line1": "Calle Dr. Jorge Cash 321",
108+
"Line2": null,
109+
"Location": {
110+
"Latitude": Math.random() * 100,
111+
"Longitude": Math.random() * 100
112+
},
113+
"PostalCode": "05033",
114+
"Region": null
115+
},
116+
"ShipVia": "shippers/2-A",
117+
"ShippedAt": null
118+
});
119+
}
120+
}
121+
122+
async function bulkInsertPipeline() {
123+
const bulk = store.bulkInsert();
124+
for (const item of data) {
125+
await bulk.store(item);
126+
}
127+
128+
await bulk.finish();
129+
}

perf/common.js

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,36 @@
1-
import { DocumentStore } from "../src";
2-
31
// tslint:disable-next-line:no-var-requires
42
const settings = require("./settings.json");
53

6-
export function getStore() {
7-
return new DocumentStore(settings.urls, settings.database);
8-
}
4+
async function bench(name, attempts, run, opts) {
5+
const benchName = `${name} x${ attempts }`;
6+
7+
if (opts && opts.before) {
8+
try {
9+
await opts.before();
10+
} catch (err) {
11+
console.log("BENCH BEFORE", err);
12+
}
13+
}
14+
15+
console.time(benchName);
16+
try {
17+
console.profile(benchName);
18+
for (let n = 0; n < attempts; n++) {
19+
await run();
20+
}
21+
console.profileEnd(benchName);
22+
} catch (err) {
23+
console.log("BENCH ERROR", err);
24+
}
25+
console.timeEnd(benchName);
26+
27+
if (opts && opts.after) {
28+
try {
29+
await opts.after();
30+
} catch (err) {
31+
console.log("BENCH AFTER", err);
32+
}
33+
}
34+
}
35+
36+
module.exports = { settings, bench };

perf/loadPipeline.js

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,19 @@ const streamValues = require("stream-json/streamers/StreamValues")
1212
const StreamUtil = require("../dist/Utility/StreamUtil");
1313
const stream = require("readable-stream");
1414
const Asm = require('stream-json/Assembler');
15+
const { bench } = require("./common");
1516

1617
const store = new DocumentStore("http://localhost:8080", "Perf");
1718
store.initialize();
1819

19-
async function bench(name, attempts, run) {
20-
const benchName = `${name} x${ attempts }`;
21-
console.time(benchName);
22-
for (let n = 0; n < attempts; n++) {
23-
await run();
24-
}
25-
console.timeEnd(benchName);
26-
}
27-
2820
(async function main() {
2921
{
30-
const name = "4.0.4-load-full-pipeline";
22+
const name = "load-full-pipeline";
3123
await bench(name, 10, loadPipeline);
3224
await bench(name, 50, loadPipeline);
3325
await bench(name, 100, loadPipeline);
3426
}
3527

36-
// {
37-
// const name = "stream-json-with-proper-casing";
38-
// // enhancedStreamJson();
39-
// await bench(name, 10, enhancedStreamJson);
40-
// await bench(name, 50, enhancedStreamJson);
41-
// await bench(name, 100, enhancedStreamJson);
42-
// }
43-
4428
store.dispose();
4529
}());
4630

@@ -68,8 +52,6 @@ async function rawStreamJson() {
6852
await donePromise;
6953
}
7054

71-
72-
7355
async function enhancedStreamJson() {
7456
const dataStream = fs.createReadStream("./data/load_data.json");
7557
const streams = [
@@ -96,5 +78,4 @@ async function enhancedStreamJson() {
9678
});
9779
await StreamUtil.pipelineAsync(streams);
9880
const result = await donePromise;
99-
// console.log(JSON.stringify(result, null, 2));
10081
}

perf/results/bulk-insert.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# BULK INSERT
2+
3+
## Current 4.0.3 - 2018-10-16
4+
5+
```
6+
bulk-insert-2018-16-10-pipeline x10: 21635.223ms
7+
```
8+
9+
# 4.0.4 optimizations - 2018-10-18
10+
11+
Buffer.concat() usage and redundant buffering logic removal.
12+
13+
```
14+
bulk-insert-2018-16-10-pipeline x10: 2490.231ms
15+
bulk-insert-2018-16-10-pipeline x50: 8280.333ms
16+
bulk-insert-2018-16-10-pipeline x100: 15802.916ms
17+
```

0 commit comments

Comments
 (0)