Skip to content

Improves support for Elasticsearch storage backend. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: stackhpc/wallaby
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions cloudkitty/storage/v2/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,35 @@
default='http://localhost:9200'),
cfg.StrOpt(
'index_name',
help='Elasticsearch index to use. Defaults to "cloudkitty".',
help='Elasticsearch index to use. '
'Defaults to "cloudkitty".',
default='cloudkitty'),
cfg.BoolOpt('insecure',
help='Set to true to allow insecure HTTPS '
'connections to Elasticsearch',
default=False),
cfg.StrOpt('cafile',
help='Path of the CA certificate to trust for '
'HTTPS connections.',
default=None),
cfg.IntOpt('scroll_duration',
help="Duration (in seconds) for which the ES scroll contexts "
"should be kept alive.",
advanced=True,
default=30, min=0, max=300),
cfg.StrOpt(
'template_name',
help='Elasticsearch template name to use. '
'Defaults to "cloudkitty_mapping".',
default='cloudkitty_mapping'),
cfg.ListOpt(
'component_templates',
help='List of Elasticsearch component template '
'names to include in the index template. ',
default=[]),
cfg.BoolOpt(
'insecure',
help='Set to true to allow insecure HTTPS '
'connections to Elasticsearch',
default=False),
cfg.StrOpt(
'cafile',
help='Path of the CA certificate to trust for '
'HTTPS connections.',
default=None),
cfg.IntOpt(
'scroll_duration',
help="Duration (in seconds) for which the ES scroll contexts "
"should be kept alive.",
advanced=True,
default=30, min=0, max=300)
]

CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP)
Expand Down Expand Up @@ -100,14 +114,31 @@ def __init__(self, *args, **kwargs):
verify=verify)

def init(self):
r = self._conn.get_index()
if r.status_code != 200:
raise exceptions.IndexDoesNotExist(
CONF.storage_elasticsearch.index_name)
LOG.info('Creating mapping "_doc" on index {}...'.format(
LOG.info('Creating index template for mapping.')
index_pattern = "{}-*".format(CONF.storage_elasticsearch.index_name)
component_templates = CONF.storage_elasticsearch.component_templates
index_template = self._conn.build_index_template(
index_pattern, component_templates, CLOUDKITTY_INDEX_MAPPING)
self._conn.put_index_template(
CONF.storage_elasticsearch.template_name, index_template)
LOG.info('Index template for mapping created.')

# If index_name exists, test to ensure it is an alias
if self._conn.exists_index():
if not self._conn.is_index_alias():
raise exceptions.IndexAliasAlreadyExists(
CONF.storage_elasticsearch.index_name)
LOG.info('Index alias already exists. Skipping creation.')

# Otherwise create a dated index with index_name as an alias
else:
LOG.info('Creating first index.')
self._conn.put_first_index()

# Rollover index on startup
LOG.info('Rolling over index {}'.format(
CONF.storage_elasticsearch.index_name))
self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING)
LOG.info('Mapping created.')
self._conn.post_index_rollover()

def push(self, dataframes, scope_id=None):
for frame in dataframes:
Expand Down
121 changes: 116 additions & 5 deletions cloudkitty/storage/v2/elasticsearch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def _build_should(filters):
{'term': {'metadata.' + k: v}}]
return should

def _build_composite(self, groupby):
@staticmethod
def _build_composite(groupby):
if not groupby:
return []
sources = []
Expand Down Expand Up @@ -147,11 +148,121 @@ def _req(self, method, url, data, params, deserialize=True):
self._log_query(url, data, output)
return output

def _req_unsafe(self, method, url, data, params):
"""Request without exception on invalid HTTP codes."""
return method(url, data=data, params=params)

def _req_exists(self, url, data, params):
r = self._sess.head(url, data=data, params=params)
if r.status_code == 200:
return True
elif r.status_code == 404:
return False
else:
raise exceptions.InvalidStatusCode(
"200/404", r.status_code, r.text, None)

@staticmethod
def build_index_template(index_pattern, component_templates, mapping):
"""Build an index template for mapping."""
# High priority template to avoid being overridden.
template_priority = 500
return {
"index_patterns": [index_pattern],
"priority": template_priority,
"composed_of": component_templates,
"template": {
"mappings": mapping
}
}

def put_index_template(self, template_name, template):
"""Does a PUT request against the ES template API.

Does a PUT request against `/_template/<template_name>`
if es6 or `/_index_template/<template_name>` if es7.

:param template_name: index template name
:type template_name: string
:param template: index template
:type template: dict
:rtype: requests.models.Response
"""
url = '/'.join(
(self._url, '_index_template', template_name))
data = json.dumps(template)
LOG.debug('Creating index template {} with data:\n{}'.format(
template_name, data))
return self._req(
self._sess.put, url, data, None, deserialize=False)

def put_first_index(self):
"""Does a PUT request against the ES index API.

Does a PUT request against `/<index_name>-{now/d}-000001`.

Creates a dated index with an alias for which it is the write index.

:rtype: requests.models.Response
"""
# Percent encode the / (%2F) in the date math for the index name
index_string = "<{}{}>".format(self._index_name, "-{now%2Fd}-000001")
url = '/'.join((self._url, index_string))
aliases = {
"aliases": {
self._index_name: {
"is_write_index": True
}
}
}
LOG.debug('Creating index {} with data:\n{}'.format(
index_string, json.dumps(aliases)))
return self._req(
self._sess.put, url, json.dumps(aliases), None, deserialize=False)

def post_index_rollover(self):
"""Does a POST request against the ES index API.

Does a POST request against `/<index_name>/_rollover`.

Performs a rollover of the index alias.

:rtype: requests.models.Response
"""
url = '/'.join((self._url, self._index_name, '_rollover'))
self._req(self._sess.post, url, None, None, deserialize=False)

def exists_index(self):
"""Does a HEAD request against the ES index API.

Does a HEAD request against `/<index_name>`.

Tests if an index or index alias exists.

:rtype: Boolean
"""
url = '/'.join((self._url, self._index_name))
param = {"allow_no_indices": "false"}
return self._req_exists(url, None, param)

def is_index_alias(self):
"""Does a HEAD request against the ES alias API.

Does a HEAD request against `/_alias/<index_name>`.

Tests if an index alias exists.

:rtype: Boolean
"""
url = '/'.join((self._url, '_alias', self._index_name))
return self._req_exists(url, None, None)

def put_mapping(self, mapping):
"""Does a PUT request against ES's mapping API.

The PUT request will be done against
`/<index_name>/_mapping/<mapping_name>`
Does a PUT request against `/<index_name>/_mapping/<mapping_name>`

Creates or updates an index mapping.

:mapping: body of the request
:type mapping: dict
Expand All @@ -168,7 +279,7 @@ def put_mapping(self, mapping):
def get_index(self):
"""Does a GET request against ES's index API.

The GET request will be done against `/<index_name>`
Does a GET request against `/<index_name>`

:rtype: requests.models.Response
"""
Expand Down Expand Up @@ -360,7 +471,7 @@ def total(self, begin, end, metric_types, filters, groupby,

must = self._build_must(begin, end, metric_types, filters)
should = self._build_should(filters)
composite = self._build_composite(groupby) if groupby else None
composite = self._build_composite(groupby)
if composite:
composite['size'] = self._chunk_size
query = self._build_query(must, should, composite)
Expand Down
8 changes: 8 additions & 0 deletions cloudkitty/storage/v2/elasticsearch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ def __init__(self, index_name):
super(IndexDoesNotExist, self).__init__(
"Elasticsearch index {} does not exist".format(index_name)
)


class IndexAliasAlreadyExists(BaseElasticsearchException):
def __init__(self, index_name):
super(IndexAliasAlreadyExists, self).__init__(
"Elasticsearch index alias {} already exists as an index".format(
index_name)
)
132 changes: 132 additions & 0 deletions cloudkitty/tests/storage/v2/elasticsearch/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dateutil import tz

from cloudkitty import dataframe
import cloudkitty.storage.v2.elasticsearch
from cloudkitty.storage.v2.elasticsearch import client
from cloudkitty.storage.v2.elasticsearch import exceptions

Expand Down Expand Up @@ -181,6 +182,137 @@ def test_req_invalid_status_code(self):
self.client._req,
method_mock, None, None, None)

def test_req_unsafe(self):
url = '/endpoint'
data = {'1': 'one'}
params = {'v'}
resp_mock = mock.MagicMock()
resp_mock.status_code = 400
method_mock = mock.MagicMock()
method_mock.return_value = resp_mock
req_resp = self.client._req_unsafe(
method_mock, url, data, params)
method_mock.assert_called_once_with(
url, data=data, params=params)
self.assertEqual(req_resp, resp_mock)

def test_req_exists(self):
url = '/endpoint'
data = {'1': 'one'}
params = {'v'}
resp_mock = mock.MagicMock()
resp_mock.status_code = 200
with mock.patch.object(self.client._sess, 'head') as hmock:
hmock.return_value = resp_mock
self.client._req_exists(
url, data=data, params=params)
hmock.assert_called_once_with(
url, data=data, params=params)

def test_req_exists_true(self):
url = '/endpoint'
resp_mock = mock.MagicMock()
resp_mock.status_code = 200
with mock.patch.object(self.client._sess, 'head') as hmock:
hmock.return_value = resp_mock
self.assertTrue(self.client._req_exists(
url, data=None, params=None))

def test_req_exists_false(self):
url = '/endpoint'
resp_mock = mock.MagicMock()
resp_mock.status_code = 404
with mock.patch.object(self.client._sess, 'head') as hmock:
hmock.return_value = resp_mock
self.assertFalse(self.client._req_exists(
url, data=None, params=None))

def test_req_exists_exception(self):
url = '/endpoint'
resp_mock = mock.MagicMock()
resp_mock.status_code = 418 # I'm a teapot
with mock.patch.object(self.client._sess, 'head') as hmock:
hmock.return_value = resp_mock
self.assertRaises(exceptions.InvalidStatusCode,
self.client._req_exists,
url, data=None, params=None)

def test_build_index_template(self):
index_pattern = "cloudkitty-*"
mapping = cloudkitty.storage.v2.elasticsearch.CLOUDKITTY_INDEX_MAPPING
component_templates = ["cloudkitty_settings"]
expected = {
"index_patterns": ["cloudkitty-*"],
"priority": 500,
"composed_of": component_templates,
"template": {
"mappings": mapping
}
}
self.assertEqual(
self.client.build_index_template(
index_pattern, component_templates, mapping), expected)

def test_put_index_template(self):
template_name = 'test_template'
template = {
"index_patterns": ["index_name-*"],
"priority": 500,
"template": {
"mappings": "fake_mapping"
}
}
expected_data = \
('{"index_patterns": ["index_name-*"], "priority": 500, '
'"template": {"mappings": "fake_mapping"}}')
with mock.patch.object(self.client, '_req') as rmock:
self.client.put_index_template(
template_name, template)
rmock.assert_called_once_with(
self.client._sess.put,
'http://elasticsearch:9200/_index_template/test_template',
expected_data, None, deserialize=False)

def test_put_first_index(self):
expected_data = '{"aliases": {"index_name": {"is_write_index": true}}}'
with mock.patch.object(self.client, '_req') as rmock:
self.client.put_first_index()
rmock.assert_called_once_with(
self.client._sess.put,
'http://elasticsearch:9200/<index_name-{now%2Fd}-000001>',
expected_data, None, deserialize=False)

def test_post_index_rollover(self):
with mock.patch.object(self.client, '_req') as rmock:
self.client.post_index_rollover()
rmock.assert_called_once_with(
self.client._sess.post,
'http://elasticsearch:9200/index_name/_rollover',
None, None, deserialize=False)

def test_exists_index(self):
expected_param = {"allow_no_indices": "false"}
resp_mock = mock.MagicMock()
resp_mock.status_code = 200
with mock.patch.object(self.client._sess, 'head') as hmock:
hmock.return_value = resp_mock
r = self.client.exists_index()
hmock.assert_called_once_with(
'http://elasticsearch:9200/index_name',
data=None, params=expected_param)
self.assertTrue(r)

def test_is_index_alias(self):
resp_mock = mock.MagicMock()
resp_mock.status_code = 200
with mock.patch.object(self.client._sess, 'head') as hmock:
hmock.return_value = resp_mock
r = self.client.is_index_alias()
hmock.assert_called_once_with(
'http://elasticsearch:9200/_alias/index_name',
data=None, params=None)
self.assertTrue(r)

def test_put_mapping(self):
mapping = {'a': 'b'}
with mock.patch.object(self.client, '_req') as rmock:
Expand Down
Loading