-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathelasticIndexer.js
115 lines (97 loc) · 3.27 KB
/
elasticIndexer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import process from 'process';
import elasticsearch from 'elasticsearch';
function VerifyEnvironmentVarsExist(envVars){
if(!envVars){
throw new Error("envVars is undefined.");
}
envVars.map(item => {
if(!process.env[item])
throw new Error(item + " environment variabe is not defined");
});
return true;
}
function elasticClient(){
if(VerifyEnvironmentVarsExist(["ES_HOST", "ES_PORT", "ES_AUTH_USER", "PBF_DIRECTORY", "ES_AUTH_PWD"])){
return new elasticsearch.Client({
host: [
{
host: process.env.ES_HOST,
auth: process.env.ES_AUTH_USER + ":" + process.env.ES_AUTH_PWD
}
],
keepAlive: true,
log: {
type: 'stdio',
levels: ['error', 'warning']
},
requestTimeout: 20000
});
}
}
function transformToElasticBulk(collection, header){
let body = [];
for(let [key, value] of collection){
let hdr_tmp = JSON.parse(JSON.stringify(header));
if(!value.id){
console.error('Missed document ID. Skipping ' + key);
console.log(JSON.stringify(value));
console.log(JSON.stringify(header));
process.exit();
}
hdr_tmp.index['_id'] = value.id;
body.push(hdr_tmp);
body.push(value);
}
return body;
}
function defaultErrorHandler(response, placesToIndex){
response.items.map(item => {
if(item.index && item.index.status > 201){
console.error('An elastic error occured while trying to index document ' + item.index["_id"] + ' to ' + item.index["_index"]);
console.log(JSON.stringify(item));
let place = placesToIndex.get(item.index["_id"]);
console.log(JSON.stringify(place));
}
});
}
function invokeElasticSearchQuery(connection, index, type, queryBody, callback){
connection.search({
index: index,
type: type,
body: queryBody
}, (error, response) => {
callback(response);
}
);
}
function elasticBulkIndex(connection, tilesToIndex, placesToIndex, callback, errorHandler){
let placeIndexHeader = {
index: { _index: 'places', _type: 'place'}
};
let tilesIndexHeader = {
index: { _index: 'tiles', _type: 'geotile'}
};
let body = [];
body = body.concat(transformToElasticBulk(tilesToIndex, tilesIndexHeader));
body = body.concat(transformToElasticBulk(placesToIndex, placeIndexHeader));
connection.bulk({
body: body
},
(err, response) => {
if(response.errors){
if (errorHandler && typeof(errorHandler) == "function"){
errorHandler(response, placesToIndex);
}else{
defaultErrorHandler(response, placesToIndex);
}
}else{
console.log('Succesfully indexed ' + body.length + ' documents. Took: ' + response.took);
}
callback();
});
}
module.exports = {
elasticBulkIndex: elasticBulkIndex,
elasticConnection: elasticClient,
invokeElasticSearchQuery: invokeElasticSearchQuery
}