Skip to content

Commit f0af53e

Browse files
authored
distro: handle dynamic tracing sampling rate from config (#367)
* distro: handle dynamic tracing sampling rate from config This will be available in stack 9.2. * Silence pyright warnings * Update FIXME text
1 parent 4911954 commit f0af53e

File tree

3 files changed

+192
-18
lines changed

3 files changed

+192
-18
lines changed

src/elasticotel/distro/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from elasticotel.distro import version
5656
from elasticotel.distro.environment_variables import ELASTIC_OTEL_OPAMP_ENDPOINT, ELASTIC_OTEL_SYSTEM_METRICS_ENABLED
5757
from elasticotel.distro.resource_detectors import get_cloud_resource_detectors
58-
from elasticotel.distro.config import opamp_handler
58+
from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE
5959

6060

6161
logger = logging.getLogger(__name__)
@@ -152,7 +152,7 @@ def _configure(self, **kwargs):
152152
# preference to use DELTA temporality as we can handle only this kind of Histograms
153153
os.environ.setdefault(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "DELTA")
154154
os.environ.setdefault(OTEL_TRACES_SAMPLER, "parentbased_traceidratio")
155-
os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, "1.0")
155+
os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, str(DEFAULT_SAMPLING_RATE))
156156

157157
base_resource_detectors = [
158158
"process_runtime",

src/elasticotel/distro/config.py

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
import logging
1818

19+
from opentelemetry import trace
20+
21+
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
1922
from opentelemetry._opamp import messages
2023
from opentelemetry._opamp.agent import OpAMPAgent
2124
from opentelemetry._opamp.client import OpAMPClient
@@ -34,32 +37,79 @@
3437
"off": 1000,
3538
}
3639

40+
DEFAULT_SAMPLING_RATE = 1.0
41+
42+
43+
def _handle_logging_level(config) -> str:
44+
error_message = ""
45+
# when config option has default value you don't get it so need to handle the default
46+
config_logging_level = config.get("logging_level")
47+
if config_logging_level is not None:
48+
logging_level = _LOG_LEVELS_MAP.get(config_logging_level) # type: ignore[reportArgumentType]
49+
else:
50+
logging_level = logging.INFO
51+
52+
if logging_level is None:
53+
logger.error("Logging level not handled: %s", config_logging_level)
54+
error_message = f"Logging level not handled: {config_logging_level}"
55+
else:
56+
# update upstream and distro logging levels
57+
logging.getLogger("opentelemetry").setLevel(logging_level)
58+
logging.getLogger("elasticotel").setLevel(logging_level)
59+
return error_message
60+
61+
62+
def _handle_sampling_rate(config) -> str:
63+
config_sampling_rate = config.get("sampling_rate")
64+
sampling_rate = DEFAULT_SAMPLING_RATE
65+
if config_sampling_rate is not None:
66+
try:
67+
sampling_rate = float(config_sampling_rate)
68+
if sampling_rate < 0 or sampling_rate > 1.0:
69+
raise ValueError()
70+
except ValueError:
71+
logger.error("Invalid `sampling_rate` from config `%s`", config_sampling_rate)
72+
return f"Invalid sampling_rate {config_sampling_rate}"
73+
74+
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
75+
if sampler is None:
76+
logger.debug("Cannot get sampler from tracer provider.")
77+
return ""
78+
79+
# FIXME: this needs to be updated for the consistent probability samplers
80+
if not isinstance(sampler, ParentBasedTraceIdRatio):
81+
logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler))
82+
return ""
83+
84+
# since sampler is parent based we need to update its root sampler
85+
root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue]
86+
if root_sampler.rate != sampling_rate: # type: ignore[reportAttributeAccessIssue]
87+
# we don't have a proper way to update it :)
88+
root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue]
89+
root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue]
90+
logger.debug("Updated sampler rate to %s", sampling_rate)
91+
return ""
92+
3793

3894
def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent):
3995
# we check config_hash because we need to track last received config and remote_config seems to be always truthy
4096
if not message.remote_config or not message.remote_config.config_hash:
4197
return
4298

43-
error_message = ""
99+
error_messages = []
44100
for config_filename, config in messages._decode_remote_config(message.remote_config):
45101
# we don't have standardized config values so limit to configs coming from our backend
46102
if config_filename == "elastic":
47103
logger.debug("Config %s: %s", config_filename, config)
48-
# when config option has default value you don't get it so need to handle the default
49-
config_logging_level = config.get("logging_level")
50-
if config_logging_level is not None:
51-
logging_level = _LOG_LEVELS_MAP.get(config_logging_level) # type: ignore[reportArgumentType]
52-
else:
53-
logging_level = logging.INFO
54-
55-
if logging_level is None:
56-
logger.warning("Logging level not handled: %s", config_logging_level)
57-
error_message = f"Logging level not handled: {config_logging_level}"
58-
else:
59-
# update upstream and distro logging levels
60-
logging.getLogger("opentelemetry").setLevel(logging_level)
61-
logging.getLogger("elasticotel").setLevel(logging_level)
104+
error_message = _handle_logging_level(config)
105+
if error_message:
106+
error_messages.append(error_message)
107+
108+
error_message = _handle_sampling_rate(config)
109+
if error_message:
110+
error_messages.append(error_message)
62111

112+
error_message = "\n".join(error_messages)
63113
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
64114
updated_remote_config = client._update_remote_config_status(
65115
remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message

tests/distro/test_distro.py

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,134 @@ def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock):
325325
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
326326
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
327327

328-
with self.assertLogs(config_logger, logging.WARNING):
328+
with self.assertLogs(config_logger, logging.ERROR) as cm:
329329
opamp_handler(agent, client, message)
330+
self.assertEqual(cm.output, ["ERROR:elasticotel.distro.config:Logging level not handled: unexpected"])
330331

331332
client._build_remote_config_status_response_message.assert_called_once_with(
332333
client._update_remote_config_status()
333334
)
334335
agent.send.assert_called_once_with(payload=mock.ANY)
336+
337+
@mock.patch("opentelemetry.trace.get_tracer_provider")
338+
def test_sets_matching_sampling_rate(self, get_tracer_provider_mock):
339+
sampler = sampling.ParentBasedTraceIdRatio(rate=1.0)
340+
get_tracer_provider_mock.return_value.sampler = sampler
341+
agent = mock.Mock()
342+
client = mock.Mock()
343+
config = opamp_pb2.AgentConfigMap()
344+
config.config_map["elastic"].body = json.dumps({"sampling_rate": "0.5"}).encode()
345+
config.config_map["elastic"].content_type = "application/json"
346+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
347+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
348+
opamp_handler(agent, client, message)
349+
350+
self.assertEqual(sampler._root.rate, 0.5)
351+
352+
client._update_remote_config_status.assert_called_once_with(
353+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
354+
)
355+
client._build_remote_config_status_response_message.assert_called_once_with(
356+
client._update_remote_config_status()
357+
)
358+
agent.send.assert_called_once_with(payload=mock.ANY)
359+
360+
@mock.patch("opentelemetry.trace.get_tracer_provider")
361+
def test_sets_sampling_rate_to_default_info_without_sampling_rate_entry_in_config(self, get_tracer_provider_mock):
362+
sampler = sampling.ParentBasedTraceIdRatio(rate=1.0)
363+
get_tracer_provider_mock.return_value.sampler = sampler
364+
agent = mock.Mock()
365+
client = mock.Mock()
366+
config = opamp_pb2.AgentConfigMap()
367+
config.config_map["elastic"].body = json.dumps({}).encode()
368+
config.config_map["elastic"].content_type = "application/json"
369+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
370+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
371+
opamp_handler(agent, client, message)
372+
373+
self.assertEqual(sampler._root.rate, 1.0)
374+
375+
client._update_remote_config_status.assert_called_once_with(
376+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
377+
)
378+
client._build_remote_config_status_response_message.assert_called_once_with(
379+
client._update_remote_config_status()
380+
)
381+
agent.send.assert_called_once_with(payload=mock.ANY)
382+
383+
@mock.patch("opentelemetry.trace.get_tracer_provider")
384+
def test_warns_if_sampling_rate_value_is_invalid(self, get_tracer_provider_mock):
385+
sampler = sampling.ParentBasedTraceIdRatio(rate=1.0)
386+
get_tracer_provider_mock.return_value.sampler = sampler
387+
agent = mock.Mock()
388+
client = mock.Mock()
389+
config = opamp_pb2.AgentConfigMap()
390+
config.config_map["elastic"].body = json.dumps({"sampling_rate": "unexpected"}).encode()
391+
config.config_map["elastic"].content_type = "application/json"
392+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
393+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
394+
395+
with self.assertLogs(config_logger, logging.ERROR) as cm:
396+
opamp_handler(agent, client, message)
397+
self.assertEqual(
398+
cm.output, ["ERROR:elasticotel.distro.config:Invalid `sampling_rate` from config `unexpected`"]
399+
)
400+
401+
client._update_remote_config_status.assert_called_once_with(
402+
remote_config_hash=b"1234",
403+
status=opamp_pb2.RemoteConfigStatuses_FAILED,
404+
error_message="Invalid sampling_rate unexpected",
405+
)
406+
client._build_remote_config_status_response_message.assert_called_once_with(
407+
client._update_remote_config_status()
408+
)
409+
agent.send.assert_called_once_with(payload=mock.ANY)
410+
411+
@mock.patch("opentelemetry.trace.get_tracer_provider")
412+
def test_warns_if_sampler_is_not_what_we_expect(self, get_tracer_provider_mock):
413+
get_tracer_provider_mock.return_value.sampler = 5
414+
agent = mock.Mock()
415+
client = mock.Mock()
416+
config = opamp_pb2.AgentConfigMap()
417+
config.config_map["elastic"].body = json.dumps({"sampling_rate": "1.0"}).encode()
418+
config.config_map["elastic"].content_type = "application/json"
419+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
420+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
421+
422+
with self.assertLogs(config_logger, logging.WARNING) as cm:
423+
opamp_handler(agent, client, message)
424+
self.assertEqual(
425+
cm.output,
426+
["WARNING:elasticotel.distro.config:Sampler <class 'int'> is not supported, not applying sampling_rate."],
427+
)
428+
429+
client._update_remote_config_status.assert_called_once_with(
430+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
431+
)
432+
client._build_remote_config_status_response_message.assert_called_once_with(
433+
client._update_remote_config_status()
434+
)
435+
agent.send.assert_called_once_with(payload=mock.ANY)
436+
437+
@mock.patch("opentelemetry.trace.get_tracer_provider")
438+
def test_ignores_tracer_provider_without_a_sampler(self, get_tracer_provider_mock):
439+
get_tracer_provider_mock.return_value.sampler = None
440+
agent = mock.Mock()
441+
client = mock.Mock()
442+
config = opamp_pb2.AgentConfigMap()
443+
config.config_map["elastic"].body = json.dumps({"sampling_rate": "1.0"}).encode()
444+
config.config_map["elastic"].content_type = "application/json"
445+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
446+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
447+
448+
with self.assertLogs(config_logger, logging.DEBUG) as cm:
449+
opamp_handler(agent, client, message)
450+
self.assertIn("DEBUG:elasticotel.distro.config:Cannot get sampler from tracer provider.", cm.output)
451+
452+
client._update_remote_config_status.assert_called_once_with(
453+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
454+
)
455+
client._build_remote_config_status_response_message.assert_called_once_with(
456+
client._update_remote_config_status()
457+
)
458+
agent.send.assert_called_once_with(payload=mock.ANY)

0 commit comments

Comments
 (0)