Skip to content

Commit 21c1380

Browse files
committed
Test: don't verify ES SSL cert. Remap ES test data (#247)
This commit will have the integration testing app skip verification of the SSL certificate Elasticsearch server uses (in case of indexing into an HTTPS URL). This simplifies loading data into nodes with self-signed or invalid certs. The commit also now deletes the "old" templates and pipelines before reindexing. And finally, it generates indices with changed mappings for Elasticsearch own test indices to match the field types used in ES tests. (The "original" indices are still kept since they CSV reconstruction tests are easier with them.) (cherry picked from commit 8b320e3)
1 parent 0001268 commit 21c1380

File tree

2 files changed

+160
-45
lines changed

2 files changed

+160
-45
lines changed

test/integration/data.py

Lines changed: 145 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#
66

77
import requests
8+
import urllib3
89
import io
910
import csv
1011
import json
@@ -205,6 +206,81 @@
205206
]
206207
}
207208

209+
EMP_TEST_MAPPING =\
210+
{
211+
"mappings" : {
212+
"properties" : {
213+
"birth_date" : {
214+
"type" : "date"
215+
},
216+
"emp_no" : {
217+
"type" : "integer"
218+
},
219+
"first_name" : {
220+
"type" : "text",
221+
"fields" : {
222+
"keyword" : {
223+
"type" : "keyword",
224+
"ignore_above" : 256
225+
}
226+
}
227+
},
228+
"gender" : {
229+
"type" : "keyword"
230+
},
231+
"hire_date" : {
232+
"type" : "date"
233+
},
234+
"languages" : {
235+
"type" : "integer"
236+
},
237+
"last_name" : {
238+
"type" : "text",
239+
"fields" : {
240+
"keyword" : {
241+
"type" : "keyword",
242+
"ignore_above" : 256
243+
}
244+
}
245+
},
246+
"salary" : {
247+
"type" : "long"
248+
}
249+
}
250+
}
251+
}
252+
253+
LIB_TEST_MAPPING =\
254+
{
255+
"mappings": {
256+
"properties": {
257+
"author": {
258+
"type": "text",
259+
"fields": {
260+
"keyword": {
261+
"type": "keyword",
262+
"ignore_above": 256
263+
}
264+
}
265+
},
266+
"name": {
267+
"type": "text",
268+
"fields": {
269+
"keyword": {
270+
"type": "keyword",
271+
"ignore_above": 256
272+
}
273+
}
274+
},
275+
"page_count": {
276+
"type": "integer"
277+
},
278+
"release_date": {
279+
"type": "date"
280+
}
281+
}
282+
}
283+
}
208284

209285
ES_DATASET_BASE_URL = "https://raw.githubusercontent.com/elastic/elasticsearch/eda31b0ac00c952a52885902be59ac429b0ca81a/x-pack/plugin/sql/qa/src/main/resources/"
210286

@@ -266,8 +342,10 @@ class TestData(object):
266342
BATTERS_INDEX = "batters"
267343
LIBRARY_FILE = "library.csv"
268344
LIBRARY_INDEX = "library"
345+
LIB_TEST_INDEX = "test_lib"
269346
EMPLOYEES_FILE = "employees.csv"
270347
EMPLOYEES_INDEX = "employees"
348+
EMP_TEST_INDEX = "test_emp"
271349
PROTO_CASE_FILE = "SqlProtocolTestCase.java"
272350

273351

@@ -292,6 +370,8 @@ class TestData(object):
292370
MODE_INDEX: "indexed"
293371
}
294372

373+
_session = None
374+
295375
def __init__(self, es, mode=MODE_INDEX, offline_dir=None):
296376
self._csv_md5 = {}
297377
self._csv_header = {}
@@ -301,6 +381,10 @@ def __init__(self, es, mode=MODE_INDEX, offline_dir=None):
301381
self._offline_dir = offline_dir
302382
self._mode = mode
303383

384+
self._req = requests.Session()
385+
self._req.verify = False
386+
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
387+
304388
def _csv_to_json_docs(self, csv_text):
305389
stream = io.StringIO(csv_text)
306390
reader = csv.reader(stream, delimiter=',', quotechar='"')
@@ -374,24 +458,11 @@ def _get_csv_as_ndjson(self, base_url, csv_name, index_name):
374458
else:
375459
assert(base_url.endswith("/"))
376460
url = base_url + csv_name
377-
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT)
461+
req = self._req.get(url, timeout = Elasticsearch.REQ_TIMEOUT)
378462
if req.status_code != 200:
379463
raise Exception("failed to fetch %s with code %s" % (url, req.status_code))
380464
return self._csv_as_ndjson(req.text, req.encoding, index_name)
381465

382-
383-
def _prepare_tableau_load(self, file_name, index_name, index_template):
384-
ndjson = self._get_csv_as_ndjson(TABLEAU_DATASET_BASE_URL, file_name, index_name)
385-
386-
if self.MODE_NOINDEX < self._mode:
387-
with requests.put("%s/_template/%s_template" % (self._es.base_url(), index_name),
388-
json=index_template, auth=self._es.credentials()) as req:
389-
if req.status_code != 200:
390-
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
391-
req.status_code, req.text))
392-
393-
return ndjson
394-
395466
def _post_ndjson(self, ndjsons, index_name, pipeline_name=None):
396467
print("Indexing data for index '%s'." % index_name)
397468
url = "%s/%s/_bulk" % (self._es.base_url(), index_name)
@@ -400,7 +471,7 @@ def _post_ndjson(self, ndjsons, index_name, pipeline_name=None):
400471
if type(ndjsons) is not list:
401472
ndjsons = [ndjsons]
402473
for n in ndjsons:
403-
with requests.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"},
474+
with self._req.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"},
404475
auth=self._es.credentials()) as req:
405476
if req.status_code not in [200, 201]:
406477
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name,
@@ -415,7 +486,7 @@ def _wait_for_results(self, index_name):
415486
waiting_since = time.time()
416487
while hits < MIN_INDEXED_DOCS:
417488
url = "%s/%s/_search" % (self._es.base_url(), index_name)
418-
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials())
489+
req = self._req.get(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials())
419490
if req.status_code != 200:
420491
raise Exception("failed to _search %s: code: %s, body: %s" % (index_name, req.status_code, req.text))
421492
answer = json.loads(req.text)
@@ -424,32 +495,48 @@ def _wait_for_results(self, index_name):
424495
if Elasticsearch.REQ_TIMEOUT < time.time() - waiting_since:
425496
raise Exception("index '%s' has less than %s documents indexed" % (index_name, MIN_INDEXED_DOCS))
426497

427-
def _delete_if_needed(self, index_name):
498+
def _del_resource(self, url):
499+
with self._req.delete(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials()) as req:
500+
if req.status_code != 200 and req.status_code != 404:
501+
raise Exception("Deleting %s failed; code=%s, body: %s." % (url, req.status_code, req.text))
502+
503+
def _delete_if_needed(self, index_name, template=False, pipeline=False):
428504
if self._mode != self.MODE_REINDEX:
429505
return
430-
print("Deleting any old index '%s'." % index_name);
506+
print("Deleting any old: index '%s'." % index_name);
431507

432508
url = "%s/%s" % (self._es.base_url(), index_name)
433-
with requests.delete(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials()) as req:
434-
if req.status_code != 200 and req.status_code != 404:
435-
raise Exception("Deleting index %s failed; code=%s, body: %s." %
436-
(index_name, req.status_code, req.text))
509+
self._del_resource(url)
510+
511+
if template:
512+
url = "%s/_template/%s" % (self._es.base_url(), index_name)
513+
self._del_resource(url)
514+
515+
if pipeline:
516+
url = "%s/_ingest/pipeline/%s" % (self._es.base_url(), index_name)
517+
self._del_resource(url)
437518

438519
def _load_tableau_sample(self, file_name, index_name, template, pipeline=None):
439-
ndjsons = self._prepare_tableau_load(file_name, index_name, template)
520+
if self._mode <= self.MODE_NOINDEX:
521+
return
522+
self._delete_if_needed(index_name, True, pipeline is not None)
440523

441-
if self.MODE_NOINDEX < self._mode:
442-
self._delete_if_needed(index_name)
524+
with self._req.put("%s/_template/%s" % (self._es.base_url(), index_name),
525+
json=template, auth=self._es.credentials()) as req:
526+
if req.status_code != 200:
527+
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
528+
req.status_code, req.text))
443529

444-
if pipeline:
445-
with requests.put("%s/_ingest/pipeline/parse_%s" % (self._es.base_url(), index_name),
446-
json=pipeline, auth=self._es.credentials()) as req:
447-
if req.status_code != 200:
448-
raise Exception("PUT %s pipeline failed with code: %s (content: %s) " % (index_name,
449-
req.status_code, req.text))
530+
if pipeline:
531+
with self._req.put("%s/_ingest/pipeline/%s" % (self._es.base_url(), index_name),
532+
json=pipeline, auth=self._es.credentials()) as req:
533+
if req.status_code != 200:
534+
raise Exception("PUT %s pipeline failed with code: %s (content: %s) " % (index_name,
535+
req.status_code, req.text))
450536

451-
self._post_ndjson(ndjsons, index_name, ("parse_" + index_name) if pipeline else None)
452-
self._wait_for_results(index_name)
537+
ndjsons = self._get_csv_as_ndjson(TABLEAU_DATASET_BASE_URL, file_name, index_name)
538+
self._post_ndjson(ndjsons, index_name, index_name if pipeline else None)
539+
self._wait_for_results(index_name)
453540

454541
def _load_elastic_sample(self, file_name, index_name):
455542
ndjson = self._get_csv_as_ndjson(ES_DATASET_BASE_URL, file_name, index_name)
@@ -458,6 +545,25 @@ def _load_elastic_sample(self, file_name, index_name):
458545
self._post_ndjson(ndjson, index_name)
459546
self._wait_for_results(index_name)
460547

548+
def _derive_with_mapping(self, src_index, dst_index, mapping_json):
549+
if self._mode < self.MODE_REINDEX:
550+
return
551+
print("Reindexing '%s' into '%s'." % (src_index, dst_index))
552+
self._delete_if_needed(dst_index)
553+
554+
with self._req.put("%s/%s" % (self._es.base_url(), dst_index),
555+
json=mapping_json, auth=self._es.credentials()) as req:
556+
if req.status_code != 200:
557+
raise Exception("PUT %s mapping failed with code: %s (content: %s) " % (dst_index,
558+
req.status_code, req.text))
559+
560+
reindex_json = {"source": {"index": src_index}, "dest": {"index": dst_index}}
561+
with self._req.post("%s/_reindex?wait_for_completion=true" % self._es.base_url(),
562+
json=reindex_json, auth=self._es.credentials()) as req:
563+
if req.status_code != 200:
564+
raise Exception("POST reindexing into %s failed with code: %s (content: %s) " % (dst_index,
565+
req.status_code, req.text))
566+
461567
def _get_kibana_file(self, sample_name, is_mapping=True):
462568
print("Fetching JS sample data for index '%s'." % sample_name)
463569
file_name = "field_mappings.js" if is_mapping else "%s.json.gz" % sample_name
@@ -468,7 +574,7 @@ def _get_kibana_file(self, sample_name, is_mapping=True):
468574
else:
469575
url = KIBANA_SAMPLES_BASE_URL + "/" + sample_name + "/"
470576
url += file_name
471-
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT)
577+
req = self._req.get(url, timeout = Elasticsearch.REQ_TIMEOUT)
472578
if req.status_code != 200:
473579
raise Exception("failed to GET URL %s for index %s with: code: %s, body: %s" %
474580
(url, sample_name, req.status_code, req.text))
@@ -492,8 +598,8 @@ def _put_sample_template(self, sample_name, index_name):
492598
# turn it to JSON (to deal with trailing commas past last member on a level
493599
mapping = eval(mapping)
494600
# PUT the built template
495-
url = "%s/_template/%s_template" % (self._es.base_url(), index_name)
496-
with requests.put(url, json=mapping, auth=self._es.credentials(), timeout=Elasticsearch.REQ_TIMEOUT) as req:
601+
url = "%s/_template/%s" % (self._es.base_url(), index_name)
602+
with self._req.put(url, json=mapping, auth=self._es.credentials(), timeout=Elasticsearch.REQ_TIMEOUT) as req:
497603
if req.status_code != 200:
498604
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
499605
req.status_code, req.text))
@@ -522,7 +628,7 @@ def _load_proto_tests(self):
522628
case_src = f.read()
523629
else:
524630
url = ES_PROTO_CASE_BASE_URL + "/" + self.PROTO_CASE_FILE
525-
req = requests.get(url, timeout=Elasticsearch.REQ_TIMEOUT)
631+
req = self._req.get(url, timeout=Elasticsearch.REQ_TIMEOUT)
526632
if req.status_code != 200:
527633
raise Exception("failed to fetch %s with code %s" % (url, req.status_code))
528634
case_src = req.text
@@ -550,7 +656,9 @@ def load(self):
550656
self._load_tableau_sample(self.BATTERS_FILE, self.BATTERS_INDEX, BATTERS_TEMPLATE, BATTERS_PIPELINE)
551657

552658
self._load_elastic_sample(self.LIBRARY_FILE, self.LIBRARY_INDEX)
659+
self._derive_with_mapping(self.LIBRARY_INDEX, self.LIB_TEST_INDEX, LIB_TEST_MAPPING)
553660
self._load_elastic_sample(self.EMPLOYEES_FILE, self.EMPLOYEES_INDEX)
661+
self._derive_with_mapping(self.EMPLOYEES_INDEX, self.EMP_TEST_INDEX, EMP_TEST_MAPPING)
554662

555663
self._load_kibana_sample(self.ECOMMERCE_INDEX)
556664
self._load_kibana_sample(self.FLIGHTS_INDEX)

test/integration/elasticsearch.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#
66

77
import requests
8+
import urllib3
89
import psutil
910

1011
import sys
@@ -43,6 +44,8 @@ class Elasticsearch(object):
4344
_credentials = None
4445
_base_url = None
4546

47+
_req = None
48+
4649
def __init__(self, offline_dir=None, url=None):
4750
self._offline_dir = offline_dir
4851
if not url:
@@ -57,6 +60,10 @@ def __init__(self, offline_dir=None, url=None):
5760
print("Using Elasticsearch instance at %s, credentials (%s, %s)" % (self._base_url, self._credentials[0],
5861
"*" * len(self._credentials[1])))
5962

63+
self._req = requests.session()
64+
self._req.verify = False
65+
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
66+
6067
@staticmethod
6168
def elasticsearch_distro_filename(version):
6269
return "%s-%s%s.%s" % (ES_PROJECT, version, ES_ARCH, PACKAGING)
@@ -68,13 +75,13 @@ def base_url(self):
6875
return self._base_url
6976

7077
def _latest_build(self, version):
71-
req = requests.get(ARTIF_URL, timeout=self.REQ_TIMEOUT)
78+
req = self._req.get(ARTIF_URL, timeout=self.REQ_TIMEOUT)
7279
vers = req.json()["versions"]
7380
if version not in vers:
7481
raise Exception("version %s not found; available: %s" % (version, vers))
7582

7683
builds_url = ARTIF_URL + "/" + version + "/builds"
77-
req = requests.get(builds_url, timeout=self.REQ_TIMEOUT)
84+
req = self._req.get(builds_url, timeout=self.REQ_TIMEOUT)
7885
builds = req.json()["builds"]
7986

8087
# file name to download
@@ -86,7 +93,7 @@ def _latest_build(self, version):
8693
# requests.async: no Windows support
8794
for build in builds:
8895
build_url = builds_url + "/" + build
89-
req = requests.get(build_url, timeout=self.REQ_TIMEOUT)
96+
req = self._req.get(build_url, timeout=self.REQ_TIMEOUT)
9097
build_json = req.json()["build"]
9198
dt_et = datetime.strptime(build_json["end_time"], "%a, %d %b %Y %X %Z")
9299
if latest_et < dt_et:
@@ -103,7 +110,7 @@ def _latest_build(self, version):
103110
def _download_build(self, url, dest_dir):
104111
print("- downloading %s: " % url, end='')
105112
size = 0
106-
req = requests.get(url, stream=True, timeout=self.REQ_TIMEOUT)
113+
req = self._req.get(url, stream=True, timeout=self.REQ_TIMEOUT)
107114
dest_file = os.path.join(dest_dir, req.url[req.url.rfind('/')+1 : ])
108115
with open(dest_file, "wb") as f:
109116
for chunk in req.iter_content(chunk_size = 32 * K1):
@@ -217,12 +224,12 @@ def _enable_xpack(self, es_dir):
217224
# setup passwords to random generated ones first...
218225
pwd = self._gen_passwords(es_dir)
219226
# ...then change passwords, easier to restart with failed tests
220-
req = requests.post("%s/_security/user/_password" % self._base_url, auth=(self._credentials[0], pwd),
227+
req = self._req.post("%s/_security/user/_password" % self._base_url, auth=(self._credentials[0], pwd),
221228
json={"password": self._credentials[1]})
222229
if req.status_code != 200:
223230
raise Exception("attempt to change elastic's password failed with code %s" % req.status_code)
224231
# kibana too (debug convenience)
225-
req = requests.post("%s/_security/user/kibana/_password" % self._base_url, auth=self._credentials,
232+
req = self._req.post("%s/_security/user/kibana/_password" % self._base_url, auth=self._credentials,
226233
json={"password": self._credentials[1]})
227234
if req.status_code != 200:
228235
print("ERROR: kibana user password change failed with code: %s" % req.status_code)
@@ -231,7 +238,7 @@ def _enable_xpack(self, es_dir):
231238
url = "%s/_license/start_trial?acknowledge=true" % self._base_url
232239
failures = 0
233240
while True:
234-
req = requests.post(url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
241+
req = self._req.post(url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
235242
if req.status_code == 200:
236243
# TODO: check content?
237244
break
@@ -282,7 +289,7 @@ def reset(self, es_dir):
282289

283290
def cluster_name(self, fail_on_non200=True):
284291
try:
285-
resp = requests.get(self._base_url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
292+
resp = self._req.get(self._base_url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
286293
except (requests.Timeout, requests.ConnectionError):
287294
return None
288295
if resp.status_code != 200:

0 commit comments

Comments
 (0)