Skip to content

Commit e98c189

Browse files
committed
Add test for migration
1 parent 0f55ef2 commit e98c189

File tree

9 files changed

+389
-9
lines changed

9 files changed

+389
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
# -*- coding: utf-8 -*-
2+
import logging
3+
import yaml
4+
import time
5+
from hamcrest import assert_that
6+
7+
from ydb.tests.library.common.types import Erasure
8+
import ydb.tests.library.common.cms as cms
9+
from ydb.tests.library.clients.kikimr_http_client import SwaggerClient
10+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
11+
from ydb.tests.library.clients.kikimr_config_client import ConfigClient
12+
from ydb.tests.library.clients.kikimr_dynconfig_client import DynConfigClient
13+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
14+
from ydb.tests.library.kv.helpers import create_kv_tablets_and_wait_for_start
15+
from ydb.public.api.protos.ydb_status_codes_pb2 import StatusIds
16+
from ydb.tests.library.harness.util import LogLevels
17+
18+
from ydb.tests.library.harness.param_constants import ydb_cli_path
19+
20+
import ydb.public.api.protos.ydb_config_pb2 as config
21+
import ydb.public.api.protos.draft.ydb_dynamic_config_pb2 as dynconfig
22+
23+
import yatest
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
def generate_config(dynconfig_client):
29+
generate_config_response = dynconfig_client.fetch_startup_config()
30+
assert_that(generate_config_response.operation.status == StatusIds.SUCCESS)
31+
32+
result = dynconfig.FetchStartupConfigResult()
33+
generate_config_response.operation.result.Unpack(result)
34+
return result.config
35+
36+
37+
def fetch_config_dynconfig(dynconfig_client):
38+
fetch_config_response = dynconfig_client.fetch_config()
39+
assert_that(fetch_config_response.operation.status == StatusIds.SUCCESS)
40+
41+
result = dynconfig.GetConfigResult()
42+
fetch_config_response.operation.result.Unpack(result)
43+
if result.config[0] == "":
44+
return None
45+
else:
46+
return result.config[0]
47+
48+
49+
def replace_config(config_client, config):
50+
replace_config_response = config_client.replace_config(config)
51+
assert_that(replace_config_response.operation.status == StatusIds.SUCCESS)
52+
53+
54+
def replace_config_dynconfig(dynconfig_client, config):
55+
replace_config_response = dynconfig_client.replace_config(config)
56+
assert_that(replace_config_response.operation.status == StatusIds.SUCCESS)
57+
58+
59+
def value_for(key, tablet_id):
60+
return "Value: <key = {key}, tablet_id = {tablet_id}>".format(
61+
key=key, tablet_id=tablet_id)
62+
63+
64+
def fetch_config(config_client):
65+
fetch_config_response = config_client.fetch_all_configs()
66+
assert_that(fetch_config_response.operation.status == StatusIds.SUCCESS)
67+
68+
result = config.FetchConfigResult()
69+
fetch_config_response.operation.result.Unpack(result)
70+
if result.config:
71+
return result.config[0].config
72+
else:
73+
return None
74+
75+
76+
class TestConfigMigrationToV2(object):
77+
erasure = Erasure.BLOCK_4_2
78+
separate_node_configs = True
79+
metadata_section = {
80+
"kind": "MainConfig",
81+
"version": 0,
82+
"cluster": "",
83+
}
84+
85+
@classmethod
86+
def setup_class(cls):
87+
nodes_count = 8 if cls.erasure == Erasure.BLOCK_4_2 else 9
88+
log_configs = {
89+
'BS_NODE': LogLevels.DEBUG,
90+
'GRPC_SERVER': LogLevels.DEBUG,
91+
'GRPC_PROXY': LogLevels.DEBUG,
92+
'TX_PROXY': LogLevels.DEBUG,
93+
'TICKET_PARSER': LogLevels.DEBUG,
94+
'BS_CONTROLLER': LogLevels.DEBUG,
95+
'TABLET_EXECUTOR': LogLevels.DEBUG,
96+
'TABLET_MAIN': LogLevels.DEBUG,
97+
}
98+
cls.configurator = KikimrConfigGenerator(
99+
cls.erasure,
100+
nodes=nodes_count,
101+
use_in_memory_pdisks=False,
102+
separate_node_configs=cls.separate_node_configs,
103+
simple_config=True,
104+
extra_grpc_services=['config'],
105+
additional_log_configs=log_configs,
106+
explicit_hosts_and_host_configs=True,
107+
)
108+
109+
cls.cluster = KiKiMR(configurator=cls.configurator)
110+
cls.cluster.start()
111+
112+
cms.request_increase_ratio_limit(cls.cluster.client)
113+
host = cls.cluster.nodes[1].host
114+
grpc_port = cls.cluster.nodes[1].port
115+
cls.swagger_client = SwaggerClient(host, cls.cluster.nodes[1].mon_port)
116+
cls.config_client = ConfigClient(host, grpc_port)
117+
cls.dynconfig_client = DynConfigClient(host, grpc_port)
118+
119+
@classmethod
120+
def teardown_class(cls):
121+
cls.cluster.stop()
122+
123+
def check_kikimr_is_operational(self, table_path, tablet_ids):
124+
for partition_id, tablet_id in enumerate(tablet_ids):
125+
write_resp = self.cluster.kv_client.kv_write(
126+
table_path, partition_id, "key", value_for("key", tablet_id)
127+
)
128+
assert_that(write_resp.operation.status == StatusIds.SUCCESS)
129+
130+
read_resp = self.cluster.kv_client.kv_read(
131+
table_path, partition_id, "key"
132+
)
133+
assert_that(read_resp.operation.status == StatusIds.SUCCESS)
134+
135+
def wait_for_all_nodes_start(self, expected_nodes_count, timeout_seconds=120):
136+
start_time = time.time()
137+
logger.info(f"Waiting for {expected_nodes_count} nodes to start and report Green status...")
138+
last_exception = None
139+
up_nodes_count = 0
140+
reported_nodes = 0
141+
142+
while time.time() - start_time < timeout_seconds:
143+
try:
144+
nodes_info = self.swagger_client.nodes_info()
145+
if nodes_info and 'Nodes' in nodes_info:
146+
current_up_nodes = 0
147+
reported_nodes = len(nodes_info['Nodes'])
148+
for node_status in nodes_info['Nodes']:
149+
system_state = node_status.get('SystemState', {})
150+
if system_state.get('SystemState') == 'Green':
151+
current_up_nodes += 1
152+
up_nodes_count = current_up_nodes
153+
154+
logger.debug(f"Node status check: {up_nodes_count}/{expected_nodes_count} Green, {reported_nodes} reported.")
155+
if up_nodes_count == expected_nodes_count:
156+
logger.info(f"All {expected_nodes_count} nodes reported Green status.")
157+
return True
158+
else:
159+
logger.debug("Waiting for nodes: Node info not available or empty in response.")
160+
161+
except Exception as e:
162+
logger.debug(f"Error fetching node status, retrying: {e}")
163+
last_exception = e
164+
165+
time.sleep(2)
166+
167+
error_message = (
168+
f"Timeout: Only {up_nodes_count} out of {expected_nodes_count} nodes "
169+
f"reached 'Green' status within {timeout_seconds} seconds. "
170+
f"({reported_nodes} nodes reported in last check)."
171+
)
172+
if last_exception:
173+
error_message += f" Last exception: {last_exception}"
174+
175+
try:
176+
final_nodes_info = self.swagger_client.nodes_info()
177+
error_message += f" Final status info: {final_nodes_info}"
178+
except Exception as final_e:
179+
error_message += f" Could not get final status: {final_e}"
180+
181+
raise TimeoutError(error_message)
182+
183+
def test_migration_to_v2(self):
184+
table_path = '/Root/mydb/mytable_migration'
185+
number_of_tablets = 5
186+
187+
tablet_ids = create_kv_tablets_and_wait_for_start(
188+
self.cluster.client,
189+
self.cluster.kv_client,
190+
self.swagger_client,
191+
number_of_tablets,
192+
table_path,
193+
timeout_seconds=3
194+
)
195+
196+
# 1 step: fetch config with dynconfig client
197+
fetched_config = fetch_config_dynconfig(self.dynconfig_client)
198+
if fetched_config is None:
199+
logger.info("No config found, generating it")
200+
# 2 step: generate config
201+
generated_config = generate_config(self.dynconfig_client)
202+
parsed_generated_config = yaml.safe_load(generated_config)
203+
metadata_section = {
204+
"version": 0,
205+
"cluster": "",
206+
"kind": "MainConfig",
207+
}
208+
parsed_fetched_config = {
209+
"metadata": metadata_section,
210+
"config": parsed_generated_config
211+
}
212+
fetched_config = yaml.dump(parsed_fetched_config)
213+
logger.debug(f"Generated config: {fetched_config}")
214+
215+
# 3 step: add feature flag
216+
parsed_fetched_config = yaml.safe_load(fetched_config)
217+
parsed_fetched_config["config"]["feature_flags"] = dict()
218+
parsed_fetched_config["config"]["feature_flags"]["switch_to_config_v2"] = True
219+
220+
# 4 step: manually replace config on nodes:
221+
self.cluster.overwrite_configs(parsed_fetched_config)
222+
223+
# 5 step: use config dir
224+
self.cluster.enable_config_dir()
225+
226+
# 6 step: restart nodes
227+
self.cluster.restart_nodes()
228+
self.wait_for_all_nodes_start(len(self.cluster.nodes))
229+
230+
self.check_kikimr_is_operational(table_path, tablet_ids)
231+
232+
logger.debug(f"Replacing config: {yaml.dump(parsed_fetched_config)}")
233+
# 7 step: replace config
234+
replace_config(self.config_client, yaml.dump(parsed_fetched_config))
235+
time.sleep(2)
236+
237+
# 8 step: fetch config
238+
fetched_config = fetch_config(self.config_client)
239+
assert_that(fetched_config is not None)
240+
logger.debug(f"Fetched config: {fetched_config}")
241+
parsed_fetched_config = yaml.safe_load(fetched_config)
242+
243+
# 9 step: enable self-management
244+
parsed_fetched_config["config"]["self_management_config"] = dict()
245+
parsed_fetched_config["config"]["self_management_config"]["enabled"] = True
246+
parsed_fetched_config["metadata"]["version"] = 1
247+
248+
# 10 step: replace config
249+
logger.debug(f"Replacing config: {yaml.dump(parsed_fetched_config)}")
250+
replace_config(self.config_client, yaml.dump(parsed_fetched_config))
251+
252+
# 11 step: restart nodes
253+
logger.debug("Restarting nodes")
254+
self.cluster.restart_nodes()
255+
self.wait_for_all_nodes_start(len(self.cluster.nodes))
256+
257+
self.check_kikimr_is_operational(table_path, tablet_ids)
258+
259+
# 11.5 step: fetch config
260+
logger.debug("Fetching config")
261+
fetched_config = fetch_config(self.config_client)
262+
assert_that(fetched_config is not None)
263+
parsed_fetched_config = yaml.safe_load(fetched_config)
264+
265+
# 12 step: move security_config to root
266+
parsed_fetched_config["config"]["security_config"] = parsed_fetched_config["config"]["domains_config"].pop("security_config")
267+
268+
# 13 step: remove unnecessary fields
269+
parsed_fetched_config["config"].pop("domains_config")
270+
parsed_fetched_config["config"].pop("blob_storage_config")
271+
parsed_fetched_config["metadata"]["version"] = 2
272+
273+
# 14 step: replace config
274+
logger.debug(f"Replacing config: {yaml.dump(parsed_fetched_config)}")
275+
replace_config(self.config_client, yaml.dump(parsed_fetched_config))
276+
277+
self.check_kikimr_is_operational(table_path, tablet_ids)
278+
279+
# 14* step: restart nodes
280+
logger.debug("Restarting nodes")
281+
self.cluster.restart_nodes()
282+
self.wait_for_all_nodes_start(len(self.cluster.nodes))
283+
284+
self.check_kikimr_is_operational(table_path, tablet_ids)

ydb/tests/functional/config/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ TEST_SRCS(
44
test_config_with_metadata.py
55
test_generate_dynamic_config.py
66
test_distconf.py
7+
test_config_migration.py
78
)
89

910
SPLIT_FACTOR(10)

ydb/tests/library/clients/kikimr_dynconfig_client.py

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ def fetch_startup_config(self):
6161
request = dynamic_config_api.FetchStartupConfigRequest()
6262
return self.invoke(request, 'FetchStartupConfig')
6363

64+
def fetch_config(self):
65+
request = dynamic_config_api.GetConfigRequest()
66+
return self.invoke(request, 'GetConfig')
67+
6468
def close(self):
6569
self._channel.close()
6670

ydb/tests/library/clients/kikimr_http_client.py

+5
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,8 @@ def tablet_info(self, tablet_type=None):
172172
return self.__http_get_and_parse_json(
173173
"/json/tabletinfo", timeout=self.__timeout
174174
)
175+
176+
def nodes_info(self):
177+
return self.__http_get_and_parse_json(
178+
"/json/nodes", timeout=self.__timeout
179+
)

ydb/tests/library/harness/daemon.py

+6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ def __init__(
7979
self.__stderr_file = None
8080
self.__aux_file = None
8181

82+
def update_command(self, new_command):
83+
new_command_tuple = tuple(new_command)
84+
if self.__command != new_command_tuple:
85+
self.logger.info(f"Updating daemon command from {self.__command} to {new_command_tuple}")
86+
self.__command = new_command_tuple
87+
8288
def __open_output_files(self):
8389
self.__stdout_file = open(self.__stdout_file_name, mode='ab')
8490
self.__stderr_file = open(self.__stderr_file_name, mode='ab')

ydb/tests/library/harness/kikimr_config.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ def __init__(
170170
simple_config=False,
171171
breakpad_minidumps_path=None,
172172
breakpad_minidumps_script=None,
173+
explicit_hosts_and_host_configs=False,
173174
):
174175
if extra_feature_flags is None:
175176
extra_feature_flags = []
@@ -180,8 +181,10 @@ def __init__(
180181
self.use_self_management = use_self_management
181182
self.simple_config = simple_config
182183
self.suppress_version_check = suppress_version_check
184+
self.explicit_hosts_and_host_configs = explicit_hosts_and_host_configs
183185
if use_self_management:
184186
self.suppress_version_check = False
187+
self.explicit_hosts_and_host_configs = True
185188
self._pdisk_store_path = pdisk_store_path
186189
self.static_pdisk_size = static_pdisk_size
187190
self.app_config = config_pb2.TAppConfig()
@@ -480,11 +483,13 @@ def __init__(
480483
self.yaml_config["kafka_proxy_config"] = kafka_proxy_config
481484

482485
self.full_config = dict()
486+
if self.explicit_hosts_and_host_configs:
487+
self._add_host_config_and_hosts()
488+
self.yaml_config.pop("nameservice_config")
483489
if self.use_self_management:
484490
self.yaml_config["domains_config"].pop("security_config")
485491
self.yaml_config["default_disk_type"] = "ROT"
486492
self.yaml_config["fail_domain_type"] = "rack"
487-
self._add_host_config_and_hosts()
488493
self.yaml_config["erasure"] = self.yaml_config.pop("static_erasure")
489494

490495
for name in ['blob_storage_config', 'domains_config', 'nameservice_config', 'system_tablets', 'grpc_config',
@@ -780,5 +785,5 @@ def __build(self):
780785
self.yaml_config["blob_storage_config"]["service_set"]["groups"][0]["rings"].append({"fail_domains": []})
781786

782787
self._add_state_storage_config()
783-
if not self.use_self_management:
788+
if not self.use_self_management and not self.explicit_hosts_and_host_configs:
784789
self._initialize_pdisks_info()

0 commit comments

Comments
 (0)