Skip to content

Commit 0c515e9

Browse files
authored
Add test for migration (#17993)
1 parent b139feb commit 0c515e9

File tree

10 files changed

+452
-77
lines changed

10 files changed

+452
-77
lines changed

ydb/core/viewer/tests/canondata/result.json

Lines changed: 67 additions & 67 deletions
Large diffs are not rendered by default.
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# -*- coding: utf-8 -*-
2+
import logging
3+
import yaml
4+
import time
5+
from hamcrest import assert_that
6+
7+
from ydb.tests.library.common.types import Erasure
8+
import ydb.tests.library.common.cms as cms
9+
from ydb.tests.library.clients.kikimr_http_client import SwaggerClient
10+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
11+
from ydb.tests.library.clients.kikimr_config_client import ConfigClient
12+
from ydb.tests.library.clients.kikimr_dynconfig_client import DynConfigClient
13+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
14+
from ydb.tests.library.kv.helpers import create_kv_tablets_and_wait_for_start
15+
from ydb.public.api.protos.ydb_status_codes_pb2 import StatusIds
16+
from ydb.tests.library.harness.util import LogLevels
17+
18+
import ydb.public.api.protos.ydb_config_pb2 as config
19+
import ydb.public.api.protos.draft.ydb_dynamic_config_pb2 as dynconfig
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def generate_config(dynconfig_client):
25+
generate_config_response = dynconfig_client.fetch_startup_config()
26+
assert_that(generate_config_response.operation.status == StatusIds.SUCCESS)
27+
28+
result = dynconfig.FetchStartupConfigResult()
29+
generate_config_response.operation.result.Unpack(result)
30+
return result.config
31+
32+
33+
def fetch_config_dynconfig(dynconfig_client):
34+
fetch_config_response = dynconfig_client.fetch_config()
35+
assert_that(fetch_config_response.operation.status == StatusIds.SUCCESS)
36+
37+
result = dynconfig.GetConfigResult()
38+
fetch_config_response.operation.result.Unpack(result)
39+
if result.config[0] == "":
40+
return None
41+
else:
42+
return result.config[0]
43+
44+
45+
def replace_config(config_client, config):
46+
replace_config_response = config_client.replace_config(config)
47+
assert_that(replace_config_response.operation.status == StatusIds.SUCCESS)
48+
49+
50+
def replace_config_dynconfig(dynconfig_client, config):
51+
replace_config_response = dynconfig_client.replace_config(config)
52+
assert_that(replace_config_response.operation.status == StatusIds.SUCCESS)
53+
54+
55+
def value_for(key, tablet_id):
56+
return "Value: <key = {key}, tablet_id = {tablet_id}>".format(
57+
key=key, tablet_id=tablet_id)
58+
59+
60+
def fetch_config(config_client):
61+
fetch_config_response = config_client.fetch_all_configs()
62+
assert_that(fetch_config_response.operation.status == StatusIds.SUCCESS)
63+
64+
result = config.FetchConfigResult()
65+
fetch_config_response.operation.result.Unpack(result)
66+
if result.config:
67+
return result.config[0].config
68+
else:
69+
return None
70+
71+
72+
class TestConfigMigrationToV2(object):
73+
erasure = Erasure.BLOCK_4_2
74+
separate_node_configs = True
75+
metadata_section = {
76+
"kind": "MainConfig",
77+
"version": 0,
78+
"cluster": "",
79+
}
80+
81+
@classmethod
82+
def setup_class(cls):
83+
nodes_count = 8 if cls.erasure == Erasure.BLOCK_4_2 else 9
84+
log_configs = {
85+
'BS_NODE': LogLevels.DEBUG,
86+
'GRPC_SERVER': LogLevels.DEBUG,
87+
'GRPC_PROXY': LogLevels.DEBUG,
88+
'TX_PROXY': LogLevels.DEBUG,
89+
'TICKET_PARSER': LogLevels.DEBUG,
90+
'BS_CONTROLLER': LogLevels.DEBUG,
91+
'TABLET_EXECUTOR': LogLevels.DEBUG,
92+
'TABLET_MAIN': LogLevels.DEBUG,
93+
}
94+
cls.configurator = KikimrConfigGenerator(
95+
cls.erasure,
96+
nodes=nodes_count,
97+
use_in_memory_pdisks=False,
98+
separate_node_configs=cls.separate_node_configs,
99+
simple_config=True,
100+
extra_grpc_services=['config'],
101+
additional_log_configs=log_configs,
102+
explicit_hosts_and_host_configs=True,
103+
)
104+
105+
cls.cluster = KiKiMR(configurator=cls.configurator)
106+
cls.cluster.start()
107+
108+
cms.request_increase_ratio_limit(cls.cluster.client)
109+
host = cls.cluster.nodes[1].host
110+
grpc_port = cls.cluster.nodes[1].port
111+
cls.swagger_client = SwaggerClient(host, cls.cluster.nodes[1].mon_port)
112+
cls.config_client = ConfigClient(host, grpc_port)
113+
cls.dynconfig_client = DynConfigClient(host, grpc_port)
114+
115+
@classmethod
116+
def teardown_class(cls):
117+
cls.cluster.stop()
118+
119+
def check_kikimr_is_operational(self, table_path, tablet_ids):
120+
for partition_id, tablet_id in enumerate(tablet_ids):
121+
write_resp = self.cluster.kv_client.kv_write(
122+
table_path, partition_id, "key", value_for("key", tablet_id)
123+
)
124+
assert_that(write_resp.operation.status == StatusIds.SUCCESS)
125+
126+
read_resp = self.cluster.kv_client.kv_read(
127+
table_path, partition_id, "key"
128+
)
129+
assert_that(read_resp.operation.status == StatusIds.SUCCESS)
130+
131+
def wait_for_all_nodes_start(self, expected_nodes_count, timeout_seconds=120):
132+
start_time = time.time()
133+
logger.info(f"Waiting for {expected_nodes_count} nodes to start and report Green status...")
134+
last_exception = None
135+
up_nodes_count = 0
136+
reported_nodes = 0
137+
138+
while time.time() - start_time < timeout_seconds:
139+
try:
140+
nodes_info = self.swagger_client.nodes_info()
141+
if nodes_info and 'Nodes' in nodes_info:
142+
current_up_nodes = 0
143+
reported_nodes = len(nodes_info['Nodes'])
144+
for node_status in nodes_info['Nodes']:
145+
system_state = node_status.get('SystemState', {})
146+
if system_state.get('SystemState') == 'Green':
147+
current_up_nodes += 1
148+
up_nodes_count = current_up_nodes
149+
150+
logger.debug(f"Node status check: {up_nodes_count}/{expected_nodes_count} Green, {reported_nodes} reported.")
151+
if up_nodes_count == expected_nodes_count:
152+
logger.info(f"All {expected_nodes_count} nodes reported Green status.")
153+
return True
154+
else:
155+
logger.debug("Waiting for nodes: Node info not available or empty in response.")
156+
157+
except Exception as e:
158+
logger.debug(f"Error fetching node status, retrying: {e}")
159+
last_exception = e
160+
161+
time.sleep(2)
162+
163+
error_message = (
164+
f"Timeout: Only {up_nodes_count} out of {expected_nodes_count} nodes "
165+
f"reached 'Green' status within {timeout_seconds} seconds. "
166+
f"({reported_nodes} nodes reported in last check)."
167+
)
168+
if last_exception:
169+
error_message += f" Last exception: {last_exception}"
170+
171+
try:
172+
final_nodes_info = self.swagger_client.nodes_info()
173+
error_message += f" Final status info: {final_nodes_info}"
174+
except Exception as final_e:
175+
error_message += f" Could not get final status: {final_e}"
176+
177+
raise TimeoutError(error_message)
178+
179+
def test_migration_to_v2(self):
180+
table_path = '/Root/mydb/mytable_migration'
181+
number_of_tablets = 5
182+
183+
tablet_ids = create_kv_tablets_and_wait_for_start(
184+
self.cluster.client,
185+
self.cluster.kv_client,
186+
self.swagger_client,
187+
number_of_tablets,
188+
table_path,
189+
timeout_seconds=3
190+
)
191+
192+
# 1 step: fetch config with dynconfig client
193+
fetched_config = fetch_config_dynconfig(self.dynconfig_client)
194+
if fetched_config is None:
195+
logger.info("No config found, generating it")
196+
# 2 step: generate config
197+
generated_config = generate_config(self.dynconfig_client)
198+
parsed_generated_config = yaml.safe_load(generated_config)
199+
metadata_section = {
200+
"version": 0,
201+
"cluster": "",
202+
"kind": "MainConfig",
203+
}
204+
parsed_fetched_config = {
205+
"metadata": metadata_section,
206+
"config": parsed_generated_config
207+
}
208+
fetched_config = yaml.dump(parsed_fetched_config)
209+
logger.debug(f"Generated config: {fetched_config}")
210+
211+
# 3 step: add feature flag
212+
parsed_fetched_config = yaml.safe_load(fetched_config)
213+
parsed_fetched_config["config"]["feature_flags"] = dict()
214+
parsed_fetched_config["config"]["feature_flags"]["switch_to_config_v2"] = True
215+
216+
# 4 step: manually replace config on nodes:
217+
self.cluster.overwrite_configs(parsed_fetched_config)
218+
219+
# 5 step: use config dir
220+
self.cluster.enable_config_dir()
221+
222+
# 6 step: restart nodes
223+
self.cluster.restart_nodes()
224+
self.wait_for_all_nodes_start(len(self.cluster.nodes))
225+
226+
self.check_kikimr_is_operational(table_path, tablet_ids)
227+
228+
logger.debug(f"Replacing config: {yaml.dump(parsed_fetched_config)}")
229+
# 7 step: replace config
230+
replace_config(self.config_client, yaml.dump(parsed_fetched_config))
231+
time.sleep(2)
232+
233+
# 8 step: fetch config
234+
fetched_config = fetch_config(self.config_client)
235+
assert_that(fetched_config is not None)
236+
logger.debug(f"Fetched config: {fetched_config}")
237+
parsed_fetched_config = yaml.safe_load(fetched_config)
238+
239+
# 9 step: enable self-management
240+
parsed_fetched_config["config"]["self_management_config"] = dict()
241+
parsed_fetched_config["config"]["self_management_config"]["enabled"] = True
242+
parsed_fetched_config["metadata"]["version"] = 1
243+
244+
# 10 step: replace config
245+
logger.debug(f"Replacing config: {yaml.dump(parsed_fetched_config)}")
246+
replace_config(self.config_client, yaml.dump(parsed_fetched_config))
247+
248+
# 11 step: restart nodes
249+
logger.debug("Restarting nodes")
250+
self.cluster.restart_nodes()
251+
self.wait_for_all_nodes_start(len(self.cluster.nodes))
252+
253+
self.check_kikimr_is_operational(table_path, tablet_ids)
254+
255+
# 11.5 step: fetch config
256+
logger.debug("Fetching config")
257+
fetched_config = fetch_config(self.config_client)
258+
assert_that(fetched_config is not None)
259+
parsed_fetched_config = yaml.safe_load(fetched_config)
260+
261+
# 12 step: move security_config to root
262+
parsed_fetched_config["config"]["security_config"] = parsed_fetched_config["config"]["domains_config"].pop("security_config")
263+
264+
# 13 step: remove unnecessary fields
265+
parsed_fetched_config["config"].pop("domains_config")
266+
parsed_fetched_config["config"].pop("blob_storage_config")
267+
parsed_fetched_config["metadata"]["version"] = 2
268+
269+
# 14 step: replace config
270+
logger.debug(f"Replacing config: {yaml.dump(parsed_fetched_config)}")
271+
replace_config(self.config_client, yaml.dump(parsed_fetched_config))
272+
273+
self.check_kikimr_is_operational(table_path, tablet_ids)
274+
275+
# 14* step: restart nodes
276+
logger.debug("Restarting nodes")
277+
self.cluster.restart_nodes()
278+
self.wait_for_all_nodes_start(len(self.cluster.nodes))
279+
280+
self.check_kikimr_is_operational(table_path, tablet_ids)

ydb/tests/functional/config/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ TEST_SRCS(
44
test_config_with_metadata.py
55
test_generate_dynamic_config.py
66
test_distconf.py
7+
test_config_migration.py
78
)
89

910
SPLIT_FACTOR(10)

ydb/tests/library/clients/kikimr_dynconfig_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ def fetch_startup_config(self):
6161
request = dynamic_config_api.FetchStartupConfigRequest()
6262
return self.invoke(request, 'FetchStartupConfig')
6363

64+
def fetch_config(self):
65+
request = dynamic_config_api.GetConfigRequest()
66+
return self.invoke(request, 'GetConfig')
67+
6468
def close(self):
6569
self._channel.close()
6670

ydb/tests/library/clients/kikimr_http_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,8 @@ def tablet_info(self, tablet_type=None):
172172
return self.__http_get_and_parse_json(
173173
"/json/tabletinfo", timeout=self.__timeout
174174
)
175+
176+
def nodes_info(self):
177+
return self.__http_get_and_parse_json(
178+
"/json/nodes", timeout=self.__timeout
179+
)

ydb/tests/library/harness/daemon.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ def __init__(
7979
self.__stderr_file = None
8080
self.__aux_file = None
8181

82+
def update_command(self, new_command):
83+
new_command_tuple = tuple(new_command)
84+
if self.__command != new_command_tuple:
85+
self.logger.info(f"Updating daemon command to {new_command_tuple}")
86+
self.__command = new_command_tuple
87+
8288
def __open_output_files(self):
8389
self.__stdout_file = open(self.__stdout_file_name, mode='ab')
8490
self.__stderr_file = open(self.__stderr_file_name, mode='ab')

ydb/tests/library/harness/kikimr_config.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ def __init__(
170170
simple_config=False,
171171
breakpad_minidumps_path=None,
172172
breakpad_minidumps_script=None,
173+
explicit_hosts_and_host_configs=False,
173174
):
174175
if extra_feature_flags is None:
175176
extra_feature_flags = []
@@ -180,8 +181,10 @@ def __init__(
180181
self.use_self_management = use_self_management
181182
self.simple_config = simple_config
182183
self.suppress_version_check = suppress_version_check
184+
self.explicit_hosts_and_host_configs = explicit_hosts_and_host_configs
183185
if use_self_management:
184186
self.suppress_version_check = False
187+
self.explicit_hosts_and_host_configs = True
185188
self._pdisk_store_path = pdisk_store_path
186189
self.static_pdisk_size = static_pdisk_size
187190
self.app_config = config_pb2.TAppConfig()
@@ -480,14 +483,16 @@ def __init__(
480483
self.yaml_config["kafka_proxy_config"] = kafka_proxy_config
481484

482485
self.full_config = dict()
486+
if self.explicit_hosts_and_host_configs:
487+
self._add_host_config_and_hosts()
488+
self.yaml_config.pop("nameservice_config")
483489
if self.use_self_management:
484490
self.yaml_config["domains_config"].pop("security_config")
485491
self.yaml_config["default_disk_type"] = "ROT"
486492
self.yaml_config["fail_domain_type"] = "rack"
487-
self._add_host_config_and_hosts()
488493
self.yaml_config["erasure"] = self.yaml_config.pop("static_erasure")
489494

490-
for name in ['blob_storage_config', 'domains_config', 'nameservice_config', 'system_tablets', 'grpc_config',
495+
for name in ['blob_storage_config', 'domains_config', 'system_tablets', 'grpc_config',
491496
'channel_profile_config', 'interconnect_config']:
492497
del self.yaml_config[name]
493498
if self.simple_config:
@@ -780,5 +785,5 @@ def __build(self):
780785
self.yaml_config["blob_storage_config"]["service_set"]["groups"][0]["rings"].append({"fail_domains": []})
781786

782787
self._add_state_storage_config()
783-
if not self.use_self_management:
788+
if not self.use_self_management and not self.explicit_hosts_and_host_configs:
784789
self._initialize_pdisks_info()

0 commit comments

Comments
 (0)