Skip to content

Commit ff9621d

Browse files
Merge pull request #3828 from mariadb-corporation/feat/MCOL-6194-failover-conf
Configurable failover interval
2 parents 5716ee0 + 922886b commit ff9621d

File tree

12 files changed

+246
-25
lines changed

12 files changed

+246
-25
lines changed

cmapi/cmapi_server/__main__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from cmapi_server.managers.certificate import CertificateManager
3232
from cmapi_server.managers.process import MCSProcessManager
3333
from failover.node_monitor import NodeMonitor
34+
from failover.config import Config
3435
from mcs_node_control.models.dbrm_socket import SOCK_TIMEOUT, DBRMSocketHandler
3536
from mcs_node_control.models.node_config import NodeConfig
3637
from tracing.trace_tool import register_tracing_tools
@@ -97,7 +98,8 @@ class FailoverBackgroundThread(plugins.SimplePlugin):
9798

9899
def __init__(self, bus, turned_on):
99100
super().__init__(bus)
100-
self.node_monitor = NodeMonitor(agent=FailoverAgent())
101+
sampling_interval = Config().getFailoverTimeoutSeconds()
102+
self.node_monitor = NodeMonitor(agent=FailoverAgent(), samplingInterval=sampling_interval)
101103
self.running = False
102104
self.turned_on = turned_on
103105
if self.turned_on:

cmapi/cmapi_server/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
# TOTP secret key
4343
SECRET_KEY = 'MCSIsTheBestEver' # not just a random string! (base32)
4444

45+
DEFAULT_FAILOVER_SAMPLING_INTERVAL_SECS = 30
46+
4547

4648
# network constants
4749
# according to https://www.ibm.com/docs/en/storage-sentinel/1.1.2?topic=installation-map-your-local-host-loopback-address

cmapi/cmapi_server/controllers/dispatcher.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
CommitController, ConfigController, ExtentMapController,
1010
LoggingConfigController, NodeController, NodeProcessController,
1111
RollbackController, ShutdownController, StartController, StatusController,
12+
CmapiConfigController,
1213
)
1314
from cmapi_server.controllers.s3dataload import S3DataLoadController
1415

@@ -205,7 +206,7 @@
205206
)
206207

207208

208-
# /_version/cluster/node/ (POST, PUT)
209+
# /_version/cluster/load_s3data (POST, PUT)
209210
dispatcher.connect(name = 'cluster_load_s3data',
210211
route = f'/cmapi/{_version}/cluster/load_s3data',
211212
action = 'load_s3data',
@@ -482,8 +483,16 @@
482483
conditions = {'method': ['PUT']}
483484
)
484485

486+
# /_version/cmapi_config (PATCH)
487+
dispatcher.connect(
488+
name = 'cmapi_config',
489+
route = f'/cmapi/{_version}/cmapi_config',
490+
action = 'patch_cmapi_config',
491+
controller = CmapiConfigController(),
492+
conditions = {'method': ['PATCH']}
493+
)
485494

486-
def jsonify_error(status, message, traceback, version): \
495+
def jsonify_error(status, message, traceback, version):
487496
# pylint: disable=unused-argument
488497
"""JSONify all CherryPy error responses (created by raising the
489498
cherrypy.HTTPError exception)

cmapi/cmapi_server/controllers/endpoints.py

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
from copy import deepcopy
77
from datetime import datetime
88
from pathlib import Path
9+
from typing import Optional
910

1011
import cherrypy
1112
import pyotp
1213
import requests
14+
from pydantic import BaseModel, Field, ValidationError, model_validator
15+
1316
from mcs_node_control.models.dbrm import set_cluster_mode
1417
from mcs_node_control.models.node_config import NodeConfig
1518
from mcs_node_control.models.node_status import NodeStatus
16-
from pydantic import ValidationError
17-
1819
from cmapi_server.constants import (
1920
CMAPI_PACKAGE_NAME,
2021
CMAPI_PORT,
@@ -30,8 +31,10 @@
3031
SECRET_KEY,
3132
)
3233
from cmapi_server.controllers.api_clients import NodeControllerClient
34+
from cmapi_server import helpers
3335
from cmapi_server.controllers.error import APIError
3436
from cmapi_server.exceptions import CMAPIBasicError, cmapi_error_to_422
37+
from cmapi_server.exceptions import validate_or_422, exc_to_422
3538
from cmapi_server.controllers.request_models import (
3639
ConfigPutRequestRootModel, StatefulConfigPutRequestModel,
3740
)
@@ -1997,12 +2000,13 @@ def put_stateful_config(self):
19972000
log_begin(module_logger, func_name)
19982001

19992002
request_body = cherrypy.request.json
2000-
try:
2001-
request_stateful_config = StatefulConfigModel.model_validate(
2002-
request_body.get('stateful_config_dict')
2003-
)
2004-
except ValidationError as exp:
2005-
raise_422_error(module_logger, func_name,f'Invalid request body: {exp.errors()}')
2003+
request_stateful_config = validate_or_422(
2004+
StatefulConfigModel,
2005+
request_body.get('stateful_config_dict'),
2006+
module_logger,
2007+
func_name,
2008+
prefix='Invalid request body',
2009+
)
20062010

20072011
success = AppStatefulConfig.apply_update(request_stateful_config)
20082012
if not success:
@@ -2014,3 +2018,52 @@ def put_stateful_config(self):
20142018
)
20152019

20162020
return {'timestamp': str(datetime.now()), 'success': success}
2021+
2022+
2023+
class CmapiConfigPatchModel(BaseModel):
2024+
failover_sampling_interval_seconds: Optional[int] = Field(default=None, ge=1)
2025+
2026+
@model_validator(mode='after')
2027+
def ensure_any_present(self):
2028+
if self.failover_sampling_interval_seconds is None:
2029+
raise ValueError('At least one field must be provided')
2030+
return self
2031+
2032+
2033+
class CmapiConfigController:
2034+
@cherrypy.tools.timeit()
2035+
@cherrypy.tools.json_in()
2036+
@cherrypy.tools.json_out()
2037+
@cherrypy.tools.validate_api_key() # pylint: disable=no-member
2038+
def patch_cmapi_config(self):
2039+
"""Update our own CMAPI config section in Columnstore.xml"""
2040+
func_name = 'patch_cmapi_config'
2041+
log_begin(module_logger, func_name)
2042+
2043+
req_model = validate_or_422(
2044+
CmapiConfigPatchModel,
2045+
cherrypy.request.json,
2046+
module_logger,
2047+
func_name,
2048+
prefix='Invalid payload',
2049+
)
2050+
2051+
# Update Columnstore.xml under <CMAPIConfig>
2052+
nc = NodeConfig()
2053+
with nc.modify_config(DEFAULT_MCS_CONF_PATH) as root:
2054+
cmapi_node = helpers.get_or_create_child_xml_node(root, 'CMAPIConfig')
2055+
2056+
# Failover sampling interval
2057+
if req_model.failover_sampling_interval_seconds is not None:
2058+
node = helpers.get_or_create_child_xml_node(cmapi_node, 'FailoverSamplingIntervalSeconds')
2059+
node.text = str(req_model.failover_sampling_interval_seconds)
2060+
2061+
with exc_to_422(module_logger, func_name, prefix='Failed to bump config revision'):
2062+
helpers.update_revision_and_manager(input_config_filename=DEFAULT_MCS_CONF_PATH)
2063+
2064+
# Broadcast updated config
2065+
with cmapi_error_to_422(module_logger, func_name):
2066+
with TransactionManager() as txn:
2067+
helpers.broadcast_new_config(nodes=txn.success_txn_nodes)
2068+
2069+
return {'timestamp': str(datetime.now())}

cmapi/cmapi_server/exceptions.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
from collections.abc import Iterator
44
from contextlib import contextmanager
5-
from typing import Optional
5+
from typing import Optional, TypeVar, Any
6+
7+
from pydantic import BaseModel, ValidationError
68

79
from cmapi_server.controllers.error import APIError
810

@@ -58,3 +60,26 @@ def cmapi_error_to_422(logger, func_name: str) -> Iterator[None]:
5860
# mirror raise_422_error behavior locally to avoid circular imports
5961
logger.error(f'{func_name} {err.message}', exc_info=False)
6062
raise APIError(422, err.message) from err
63+
64+
65+
T = TypeVar('T', bound=BaseModel)
66+
67+
def validate_or_422(
68+
model: type[T], payload: Any, logger, func_name: str,
69+
prefix: Optional[str] = 'Invalid request body',
70+
) -> T:
71+
"""Validate payload with Pydantic model or raise HTTP 422 APIError."""
72+
try:
73+
return model.model_validate(payload)
74+
except ValidationError as exp:
75+
msg = f"{prefix}: {exp.errors()}" if prefix else str(exp.errors())
76+
logger.error(f"{func_name} {msg}", exc_info=False)
77+
raise APIError(422, msg) from exp
78+
79+
80+
@contextmanager
81+
def exc_to_422(logger, func_name: str, prefix: Optional[str] = None) -> Iterator[None]:
82+
"""Convert any exception into HTTP 422"""
83+
with cmapi_error_to_422(logger, func_name):
84+
with exc_to_cmapi_error(prefix=prefix):
85+
yield

cmapi/cmapi_server/helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import aiohttp
2020
import lxml.objectify
21+
from lxml import etree
2122
import requests
2223
from tracing.traced_session import get_traced_session
2324
from tracing.traced_aiohttp import create_traced_async_session
@@ -54,6 +55,14 @@ def get_id() -> int:
5455
return int(random() * 1000000)
5556

5657

58+
def get_or_create_child_xml_node(parent, name: str):
59+
"""Get a direct child by tag name or create it if missing."""
60+
node = parent.find(f'./{name}')
61+
if node is None:
62+
node = etree.SubElement(parent, name)
63+
return node
64+
65+
5766
def start_transaction(
5867
config_filename: str = CMAPI_CONF_PATH,
5968
cs_config_filename: str = DEFAULT_MCS_CONF_PATH,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import json
2+
from contextlib import ExitStack
3+
from unittest.mock import Mock, patch
4+
5+
import requests
6+
from lxml import etree
7+
8+
from cmapi_server.constants import _version
9+
from cmapi_server.test.unittest_global import TEST_API_KEY, BaseServerTestCase
10+
11+
requests.packages.urllib3.disable_warnings()
12+
13+
14+
class TestCmapiConfigEndpoint(BaseServerTestCase):
15+
def setUp(self):
16+
super().setUp()
17+
self._stack = ExitStack()
18+
self.addCleanup(self._stack.close)
19+
20+
# Mock broadcast_new_config, TransactionManager and config path
21+
self.broadcast_mock = Mock()
22+
self._stack.enter_context(patch('cmapi_server.helpers.broadcast_new_config', new=self.broadcast_mock))
23+
self._stack.enter_context(patch('cmapi_server.controllers.endpoints.DEFAULT_MCS_CONF_PATH', new=self.mcs_config_filename))
24+
self._stack.enter_context(patch('cmapi_server.constants.DEFAULT_MCS_CONF_PATH', new=self.mcs_config_filename))
25+
26+
class _FakeTxn:
27+
def __enter__(self):
28+
self.success_txn_nodes = ['n1', 'n2']
29+
return self
30+
31+
def __exit__(self, *a):
32+
return False
33+
34+
self._stack.enter_context(patch('cmapi_server.controllers.endpoints.TransactionManager', new=lambda: _FakeTxn()))
35+
36+
def test_sampling_interval_written(self):
37+
r = self._request_patch(
38+
{
39+
'failover_sampling_interval_seconds': 45,
40+
}
41+
)
42+
self.assertEqual(r.status_code, 200)
43+
44+
tree = etree.parse(str(self.mcs_config_filename))
45+
self.assertEqual(tree.findtext('./CMAPIConfig/FailoverSamplingIntervalSeconds'), '45')
46+
47+
def test_interval_bounds_rejected(self):
48+
r = self._request_patch({'failover_sampling_interval_seconds': 0})
49+
self.assertEqual(r.status_code, 422)
50+
body = r.json()
51+
self.assertIn('error', body)
52+
53+
def test_broadcast_called(self):
54+
r = self._request_patch({'failover_sampling_interval_seconds': 30})
55+
self.assertEqual(r.status_code, 200)
56+
self.broadcast_mock.assert_called_once()
57+
self.assertEqual(self.broadcast_mock.call_args.kwargs.get('nodes'), ['n1', 'n2'])
58+
59+
def _request_patch(self, payload: dict):
60+
url = f'https://localhost:8640/cmapi/{_version}/cmapi_config'
61+
headers = {'x-api-key': TEST_API_KEY, 'Content-Type': 'application/json'}
62+
return requests.patch(url, verify=False, headers=headers, data=json.dumps(payload))

cmapi/cmapi_server/test/test_em_endpoints.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ def run_server():
2626
CertificateManager.create_self_signed_certificate_if_not_exist()
2727
cherrypy.engine.start()
2828
cherrypy.engine.wait(cherrypy.engine.states.STARTED)
29-
yield
30-
cherrypy.engine.exit()
31-
cherrypy.engine.block()
29+
try:
30+
yield
31+
finally:
32+
cherrypy.engine.exit()
33+
cherrypy.engine.block()
3234

3335

34-
def get_current_key():
36+
def get_current_key() -> str:
3537
app_config = configparser.ConfigParser()
3638
try:
3739
with open(cmapi_config_filename, 'r') as _config_file:

cmapi/failover/config.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
import configparser
21
import logging
32
import threading
43
from os.path import getmtime
54

6-
import lxml
7-
8-
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, DEFAULT_SM_CONF_PATH
5+
from cmapi_server.constants import DEFAULT_FAILOVER_SAMPLING_INTERVAL_SECS, DEFAULT_MCS_CONF_PATH
96
from mcs_node_control.models.node_config import NodeConfig
107

118

@@ -18,6 +15,7 @@ class Config:
1815
_inactive_nodes = []
1916
_primary_node = ''
2017
_my_name = None # derived from config file
18+
_failover_sampling_interval = DEFAULT_FAILOVER_SAMPLING_INTERVAL_SECS
2119

2220
config_lock = threading.Lock()
2321
last_mtime = 0
@@ -69,6 +67,13 @@ def getPrimaryNode(self):
6967
self.config_lock.release()
7068
return ret
7169

70+
def getFailoverTimeoutSeconds(self) -> int:
71+
self.config_lock.acquire()
72+
self.check_reload()
73+
ret = self._failover_sampling_interval
74+
self.config_lock.release()
75+
return ret
76+
7277
def check_reload(self):
7378
"""Check config reload.
7479
@@ -157,4 +162,11 @@ def load_config(self):
157162
self._primary_node = primary_node
158163
self.last_mtime = last_mtime
159164
self._my_name = my_name
165+
166+
sampling_interval_node = root.find('./CMAPIConfig/FailoverSamplingIntervalSeconds')
167+
if sampling_interval_node is not None and sampling_interval_node.text is not None:
168+
self._failover_sampling_interval = int(sampling_interval_node.text)
169+
else:
170+
self._failover_sampling_interval = DEFAULT_FAILOVER_SAMPLING_INTERVAL_SECS
171+
160172
return True

cmapi/integration_tests/ssh.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
import json
33
from contextlib import contextmanager
44
from pathlib import Path
5-
from typing import Any, Callable
5+
from typing import Any, Callable, Optional
66

77
from fabric import Connection
88

9-
109
from dataclasses import dataclass, field
1110

1211
@dataclass
@@ -75,8 +74,8 @@ def replicas(self) -> list[RemoteHost]:
7574
def run_on_all_hosts_parallel(
7675
hosts: list[RemoteHost],
7776
func: Callable[[RemoteHost], Any],
78-
timeout: float | None = None,
79-
max_workers: int | None = None,
77+
timeout: Optional[float] = None,
78+
max_workers: Optional[int] = None,
8079
) -> dict[str, Any]:
8180
"""
8281
Run a function on all hosts in parallel.

0 commit comments

Comments
 (0)