diff --git a/cloudkitty/storage/v2/elasticsearch/__init__.py b/cloudkitty/storage/v2/elasticsearch/__init__.py index 058e9126..b6673873 100644 --- a/cloudkitty/storage/v2/elasticsearch/__init__.py +++ b/cloudkitty/storage/v2/elasticsearch/__init__.py @@ -37,21 +37,35 @@ default='http://localhost:9200'), cfg.StrOpt( 'index_name', - help='Elasticsearch index to use. Defaults to "cloudkitty".', + help='Elasticsearch index to use. ' + 'Defaults to "cloudkitty".', default='cloudkitty'), - cfg.BoolOpt('insecure', - help='Set to true to allow insecure HTTPS ' - 'connections to Elasticsearch', - default=False), - cfg.StrOpt('cafile', - help='Path of the CA certificate to trust for ' - 'HTTPS connections.', - default=None), - cfg.IntOpt('scroll_duration', - help="Duration (in seconds) for which the ES scroll contexts " - "should be kept alive.", - advanced=True, - default=30, min=0, max=300), + cfg.StrOpt( + 'template_name', + help='Elasticsearch template name to use. ' + 'Defaults to "cloudkitty_mapping".', + default='cloudkitty_mapping'), + cfg.ListOpt( + 'component_templates', + help='List of Elasticsearch component template ' + 'names to include in the index template. ', + default=[]), + cfg.BoolOpt( + 'insecure', + help='Set to true to allow insecure HTTPS ' + 'connections to Elasticsearch', + default=False), + cfg.StrOpt( + 'cafile', + help='Path of the CA certificate to trust for ' + 'HTTPS connections.', + default=None), + cfg.IntOpt( + 'scroll_duration', + help="Duration (in seconds) for which the ES scroll contexts " + "should be kept alive.", + advanced=True, + default=30, min=0, max=300) ] CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP) @@ -100,14 +114,31 @@ def __init__(self, *args, **kwargs): verify=verify) def init(self): - r = self._conn.get_index() - if r.status_code != 200: - raise exceptions.IndexDoesNotExist( - CONF.storage_elasticsearch.index_name) - LOG.info('Creating mapping "_doc" on index {}...'.format( + LOG.info('Creating index template for mapping.') + index_pattern = "{}-*".format(CONF.storage_elasticsearch.index_name) + component_templates = CONF.storage_elasticsearch.component_templates + index_template = self._conn.build_index_template( + index_pattern, component_templates, CLOUDKITTY_INDEX_MAPPING) + self._conn.put_index_template( + CONF.storage_elasticsearch.template_name, index_template) + LOG.info('Index template for mapping created.') + + # If index_name exists, test to ensure it is an alias + if self._conn.exists_index(): + if not self._conn.is_index_alias(): + raise exceptions.IndexAliasAlreadyExists( + CONF.storage_elasticsearch.index_name) + LOG.info('Index alias already exists. Skipping creation.') + + # Otherwise create a dated index with index_name as an alias + else: + LOG.info('Creating first index.') + self._conn.put_first_index() + + # Rollover index on startup + LOG.info('Rolling over index {}'.format( CONF.storage_elasticsearch.index_name)) - self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING) - LOG.info('Mapping created.') + self._conn.post_index_rollover() def push(self, dataframes, scope_id=None): for frame in dataframes: diff --git a/cloudkitty/storage/v2/elasticsearch/client.py b/cloudkitty/storage/v2/elasticsearch/client.py index 79651b8c..6b3e78a9 100644 --- a/cloudkitty/storage/v2/elasticsearch/client.py +++ b/cloudkitty/storage/v2/elasticsearch/client.py @@ -92,7 +92,8 @@ def _build_should(filters): {'term': {'metadata.' + k: v}}] return should - def _build_composite(self, groupby): + @staticmethod + def _build_composite(groupby): if not groupby: return [] sources = [] @@ -147,11 +148,121 @@ def _req(self, method, url, data, params, deserialize=True): self._log_query(url, data, output) return output + def _req_unsafe(self, method, url, data, params): + """Request without exception on invalid HTTP codes.""" + return method(url, data=data, params=params) + + def _req_exists(self, url, data, params): + r = self._sess.head(url, data=data, params=params) + if r.status_code == 200: + return True + elif r.status_code == 404: + return False + else: + raise exceptions.InvalidStatusCode( + "200/404", r.status_code, r.text, None) + + @staticmethod + def build_index_template(index_pattern, component_templates, mapping): + """Build an index template for mapping.""" + # High priority template to avoid being overridden. + template_priority = 500 + return { + "index_patterns": [index_pattern], + "priority": template_priority, + "composed_of": component_templates, + "template": { + "mappings": mapping + } + } + + def put_index_template(self, template_name, template): + """Does a PUT request against the ES template API. + + Does a PUT request against `/_template/` + if es6 or `/_index_template/` if es7. + + :param template_name: index template name + :type template_name: string + :param template: index template + :type template: dict + :rtype: requests.models.Response + """ + url = '/'.join( + (self._url, '_index_template', template_name)) + data = json.dumps(template) + LOG.debug('Creating index template {} with data:\n{}'.format( + template_name, data)) + return self._req( + self._sess.put, url, data, None, deserialize=False) + + def put_first_index(self): + """Does a PUT request against the ES index API. + + Does a PUT request against `/-{now/d}-000001`. + + Creates a dated index with an alias for which it is the write index. + + :rtype: requests.models.Response + """ + # Percent encode the / (%2F) in the date math for the index name + index_string = "<{}{}>".format(self._index_name, "-{now%2Fd}-000001") + url = '/'.join((self._url, index_string)) + aliases = { + "aliases": { + self._index_name: { + "is_write_index": True + } + } + } + LOG.debug('Creating index {} with data:\n{}'.format( + index_string, json.dumps(aliases))) + return self._req( + self._sess.put, url, json.dumps(aliases), None, deserialize=False) + + def post_index_rollover(self): + """Does a POST request against the ES index API. + + Does a POST request against `//_rollover`. + + Performs a rollover of the index alias. + + :rtype: requests.models.Response + """ + url = '/'.join((self._url, self._index_name, '_rollover')) + self._req(self._sess.post, url, None, None, deserialize=False) + + def exists_index(self): + """Does a HEAD request against the ES index API. + + Does a HEAD request against `/`. + + Tests if an index or index alias exists. + + :rtype: Boolean + """ + url = '/'.join((self._url, self._index_name)) + param = {"allow_no_indices": "false"} + return self._req_exists(url, None, param) + + def is_index_alias(self): + """Does a HEAD request against the ES alias API. + + Does a HEAD request against `/_alias/`. + + Tests if an index alias exists. + + :rtype: Boolean + """ + url = '/'.join((self._url, '_alias', self._index_name)) + return self._req_exists(url, None, None) + def put_mapping(self, mapping): """Does a PUT request against ES's mapping API. - The PUT request will be done against - `//_mapping/` + Does a PUT request against `//_mapping/` + + Creates or updates an index mapping. :mapping: body of the request :type mapping: dict @@ -168,7 +279,7 @@ def put_mapping(self, mapping): def get_index(self): """Does a GET request against ES's index API. - The GET request will be done against `/` + Does a GET request against `/` :rtype: requests.models.Response """ @@ -360,7 +471,7 @@ def total(self, begin, end, metric_types, filters, groupby, must = self._build_must(begin, end, metric_types, filters) should = self._build_should(filters) - composite = self._build_composite(groupby) if groupby else None + composite = self._build_composite(groupby) if composite: composite['size'] = self._chunk_size query = self._build_query(must, should, composite) diff --git a/cloudkitty/storage/v2/elasticsearch/exceptions.py b/cloudkitty/storage/v2/elasticsearch/exceptions.py index 201d7f9d..7497e67d 100644 --- a/cloudkitty/storage/v2/elasticsearch/exceptions.py +++ b/cloudkitty/storage/v2/elasticsearch/exceptions.py @@ -30,3 +30,11 @@ def __init__(self, index_name): super(IndexDoesNotExist, self).__init__( "Elasticsearch index {} does not exist".format(index_name) ) + + +class IndexAliasAlreadyExists(BaseElasticsearchException): + def __init__(self, index_name): + super(IndexAliasAlreadyExists, self).__init__( + "Elasticsearch index alias {} already exists as an index".format( + index_name) + ) diff --git a/cloudkitty/tests/storage/v2/elasticsearch/test_client.py b/cloudkitty/tests/storage/v2/elasticsearch/test_client.py index 8948be91..2feb7c17 100644 --- a/cloudkitty/tests/storage/v2/elasticsearch/test_client.py +++ b/cloudkitty/tests/storage/v2/elasticsearch/test_client.py @@ -20,6 +20,7 @@ from dateutil import tz from cloudkitty import dataframe +import cloudkitty.storage.v2.elasticsearch from cloudkitty.storage.v2.elasticsearch import client from cloudkitty.storage.v2.elasticsearch import exceptions @@ -181,6 +182,137 @@ def test_req_invalid_status_code(self): self.client._req, method_mock, None, None, None) + def test_req_unsafe(self): + url = '/endpoint' + data = {'1': 'one'} + params = {'v'} + resp_mock = mock.MagicMock() + resp_mock.status_code = 400 + method_mock = mock.MagicMock() + method_mock.return_value = resp_mock + req_resp = self.client._req_unsafe( + method_mock, url, data, params) + method_mock.assert_called_once_with( + url, data=data, params=params) + self.assertEqual(req_resp, resp_mock) + + def test_req_exists(self): + url = '/endpoint' + data = {'1': 'one'} + params = {'v'} + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.client._req_exists( + url, data=data, params=params) + hmock.assert_called_once_with( + url, data=data, params=params) + + def test_req_exists_true(self): + url = '/endpoint' + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.assertTrue(self.client._req_exists( + url, data=None, params=None)) + + def test_req_exists_false(self): + url = '/endpoint' + resp_mock = mock.MagicMock() + resp_mock.status_code = 404 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.assertFalse(self.client._req_exists( + url, data=None, params=None)) + + def test_req_exists_exception(self): + url = '/endpoint' + resp_mock = mock.MagicMock() + resp_mock.status_code = 418 # I'm a teapot + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.assertRaises(exceptions.InvalidStatusCode, + self.client._req_exists, + url, data=None, params=None) + + def test_build_index_template(self): + index_pattern = "cloudkitty-*" + mapping = cloudkitty.storage.v2.elasticsearch.CLOUDKITTY_INDEX_MAPPING + component_templates = ["cloudkitty_settings"] + expected = { + "index_patterns": ["cloudkitty-*"], + "priority": 500, + "composed_of": component_templates, + "template": { + "mappings": mapping + } + } + self.assertEqual( + self.client.build_index_template( + index_pattern, component_templates, mapping), expected) + + def test_put_index_template(self): + template_name = 'test_template' + template = { + "index_patterns": ["index_name-*"], + "priority": 500, + "template": { + "mappings": "fake_mapping" + } + } + expected_data = \ + ('{"index_patterns": ["index_name-*"], "priority": 500, ' + '"template": {"mappings": "fake_mapping"}}') + with mock.patch.object(self.client, '_req') as rmock: + self.client.put_index_template( + template_name, template) + rmock.assert_called_once_with( + self.client._sess.put, + 'http://elasticsearch:9200/_index_template/test_template', + expected_data, None, deserialize=False) + + def test_put_first_index(self): + expected_data = '{"aliases": {"index_name": {"is_write_index": true}}}' + with mock.patch.object(self.client, '_req') as rmock: + self.client.put_first_index() + rmock.assert_called_once_with( + self.client._sess.put, + 'http://elasticsearch:9200/', + expected_data, None, deserialize=False) + + def test_post_index_rollover(self): + with mock.patch.object(self.client, '_req') as rmock: + self.client.post_index_rollover() + rmock.assert_called_once_with( + self.client._sess.post, + 'http://elasticsearch:9200/index_name/_rollover', + None, None, deserialize=False) + + def test_exists_index(self): + expected_param = {"allow_no_indices": "false"} + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + r = self.client.exists_index() + hmock.assert_called_once_with( + 'http://elasticsearch:9200/index_name', + data=None, params=expected_param) + self.assertTrue(r) + + def test_is_index_alias(self): + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + r = self.client.is_index_alias() + hmock.assert_called_once_with( + 'http://elasticsearch:9200/_alias/index_name', + data=None, params=None) + self.assertTrue(r) + def test_put_mapping(self): mapping = {'a': 'b'} with mock.patch.object(self.client, '_req') as rmock: diff --git a/cloudkitty/tests/storage/v2/es_utils.py b/cloudkitty/tests/storage/v2/es_utils.py index 40e27c38..1d806453 100644 --- a/cloudkitty/tests/storage/v2/es_utils.py +++ b/cloudkitty/tests/storage/v2/es_utils.py @@ -26,7 +26,10 @@ class FakeElasticsearchClient(client.ElasticsearchClient): def __init__(self, *args, **kwargs): kwargs["autocommit"] = False super(FakeElasticsearchClient, self).__init__(*args, **kwargs) - for method in ('get_index', 'put_mapping'): + for method in ('exists_index', + 'is_index_alias', + 'get_index', + 'put_mapping'): setattr(self, method, self.__base_response) @staticmethod diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 248ec857..87b62c24 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -242,12 +242,6 @@ function create_influxdb_database { fi } -function create_elasticsearch_index { - if [ "$CLOUDKITTY_STORAGE_BACKEND" == "elasticsearch" ]; then - curl -XPUT "${CLOUDKITTY_ELASTICSEARCH_HOST}/${CLOUDKITTY_ELASTICSEARCH_INDEX}" - fi -} - # init_cloudkitty() - Initialize CloudKitty database function init_cloudkitty { # Delete existing cache @@ -264,7 +258,6 @@ function init_cloudkitty { recreate_database cloudkitty utf8 create_influxdb_database - create_elasticsearch_index # Migrate cloudkitty database $CLOUDKITTY_BIN_DIR/cloudkitty-dbsync upgrade @@ -301,15 +294,42 @@ function install_influx { sudo systemctl start influxdb || sudo systemctl restart influxdb } +# Remove Elasticsearch package if present +function _cleanup_elasticsearch_ubuntu { + if sudo dpkg --list elasticsearch; then + sudo dpkg --purge elasticsearch + fi + _cleanup_elasticsearch_data +} + +function _cleanup_elasticsearch_fedora { + if sudo rpm -q elasticsearch; then + sudo yum remove elasticsearch -y + fi + _cleanup_elasticsearch_data +} + +# Remove Elasticsearch data if present +function _cleanup_elasticsearch_data { + if [[ -d /var/lib/elasticsearch ]]; then + sudo rm -rf /var/lib/elasticsearch + fi + if [[ -d /etc/elasticsearch ]]; then + sudo rm -rf /etc/elasticsearch + fi +} + function install_elasticsearch_ubuntu { + _cleanup_elasticsearch_ubuntu sudo apt install -qy openjdk-8-jre - local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.3.deb) - sudo dpkg -i --skip-same-version ${elasticsearch_file} + local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-amd64.deb) + sudo dpkg -i ${elasticsearch_file} } function install_elasticsearch_fedora { + _cleanup_elasticsearch_fedora sudo yum install -y java-1.8.0-openjdk - local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.3.rpm) + local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-x86_64.rpm) sudo yum localinstall -y ${elasticsearch_file} } @@ -322,6 +342,10 @@ function install_elasticsearch { die $LINENO "Distribution must be Debian or Fedora-based" fi sudo systemctl start elasticsearch || sudo systemctl restart elasticsearch + echo "Waiting a minute for Elasticsearch to start..." + if ! wait_for_service 60 http://localhost:9200/; then + die $LINENO "Elasticsearch did not start" + fi } # install_cloudkitty() - Collect source and prepare diff --git a/doc/source/_static/cloudkitty.conf.sample b/doc/source/_static/cloudkitty.conf.sample index 9c00a17b..08f8b556 100644 --- a/doc/source/_static/cloudkitty.conf.sample +++ b/doc/source/_static/cloudkitty.conf.sample @@ -1353,6 +1353,14 @@ # Elasticsearch index to use. Defaults to "cloudkitty". (string value) #index_name = cloudkitty +# Elasticsearch template name to use. Defaults to +# "cloudkitty_mapping". (string value) +#template_name = cloudkitty_mapping + +# List of Elasticsearch component template names to include in the +# index template. (list value) +#component_templates = + # Set to true to allow insecure HTTPS connections to Elasticsearch # (boolean value) #insecure = false diff --git a/releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml b/releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml new file mode 100644 index 00000000..072a10ac --- /dev/null +++ b/releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml @@ -0,0 +1,17 @@ +--- +prelude: > + Improves support for Elasticsearch storage backend. Requires Elasticsearch 7. +features: + - | + Improves support for Elasticsearch storage backend. Requires Elasticsearch 7. + Provides an index template to manage cloudkitty document mappings with the + option to include user provided component templates. + Uses dated indices and an index alias to allow for better management of data. +upgrade: + - | + Improves support for Elasticsearch storage backend. Requires Elasticsearch 7. + The config setting 'index_name' now refers to the index alias used to + reference the latest cloudkitty index. If no index or index alias exists on + first run then they will be created. If 'index_name' already exists and is + not an alias then initialisation will fail - a simple solution is to use a + new 'index_name' and reindex existing data into the first index created.