From 70fb373054b0daca12c3d4271122df48a89b7ae3 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 09:18:35 -0400 Subject: [PATCH 01/14] Add health event for slow DBM async jobs --- .../datadog_checks/base/utils/db/health.py | 1 + .../datadog_checks/base/utils/db/utils.py | 22 ++++++++++++++++ .../tests/base/utils/db/test_util.py | 25 +++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index 54c96f5a220a4..befebdebc0cdf 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -28,6 +28,7 @@ class HealthEvent(Enum): """ INITIALIZATION = 'initialization' + MISSED_COLLECTION = 'missed_collection' class HealthStatus(Enum): diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 0c46a26cff82e..b1267bab692ab 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -21,6 +21,7 @@ from datadog_checks.base.agent import datadog_agent from datadog_checks.base.log import get_check_logger from datadog_checks.base.utils.common import to_native_string +from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.types import Transformer # noqa: F401 from datadog_checks.base.utils.format import json from datadog_checks.base.utils.tracing import INTEGRATION_TRACING_SERVICE_NAME, tracing_enabled @@ -293,6 +294,10 @@ def __init__( expected_db_exceptions=(), shutdown_callback=None, job_name=None, + # Some users may want to disable the missed collection event, + # for example if they set the collection interval intentionally low + # to effectively run the job in a loop + enable_missed_collection_event=True, ): self._check = check self._config_host = config_host @@ -314,6 +319,7 @@ def __init__( self._enabled = enabled self._expected_db_exceptions = expected_db_exceptions self._job_name = job_name + self._enable_missed_collection_event = enable_missed_collection_event def cancel(self): """ @@ -342,6 +348,22 @@ def run_job_loop(self, tags): elif self._job_loop_future is None or not self._job_loop_future.running(): self._job_loop_future = DBMAsyncJob.executor.submit(self._job_loop) else: + if time.time() - self._rate_limiter.last_event > self._min_collection_interval: + if self._check.health and self._enable_missed_collection_event: + # Missed a collection interval, submit a health event + self._check.health.submit_health_event( + name=HealthEvent.MISSED_COLLECTION, + status=HealthStatus.WARNING, + tags=self._job_tags, + dbms=self._dbms, + job_name=self._job_name, + collection_interval=self._min_collection_interval, + last_check_run=self._last_check_run, + last_job_loop_time=self._rate_limiter.last_event, + elapsed_time=(time.time() - self._rate_limiter.last_event) * 1000, + ) + self._log.warning("[%s] Missed collection interval", self._job_name) + self._log.debug("Job loop already running. job=%s", self._job_name) def _job_loop(self): diff --git a/datadog_checks_base/tests/base/utils/db/test_util.py b/datadog_checks_base/tests/base/utils/db/test_util.py index 7c51e73856001..d9954c63cd12e 100644 --- a/datadog_checks_base/tests/base/utils/db/test_util.py +++ b/datadog_checks_base/tests/base/utils/db/test_util.py @@ -13,6 +13,7 @@ from datadog_checks.base import AgentCheck from datadog_checks.base.stubs.datadog_agent import datadog_agent +from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.utils import ( ConstantRateLimiter, DBMAsyncJob, @@ -123,6 +124,30 @@ def test_ratelimiting_ttl_cache(): assert cache.acquire(i), "cache should be empty again so these keys should go in OK" +class HealthCapture: + def __init__(self): + self.events = [] + + def submit_health_event(self, name, status, **kwargs): + self.events.append((name, status, kwargs)) + + +def test_dbm_async_job_missed_collection_interval(): + check = AgentCheck() + check.health = HealthCapture() + job = JobForTesting(check, min_collection_interval=0.1, job_execution_time=1) + job.run_job_loop([]) + time.sleep(0.2) + # Simulate the check calling run_job_loop on its run + job.run_job_loop([]) + job.cancel() + assert len(check.health.events) == 1 + health_event = check.health.events[0] + assert health_event[0] == HealthEvent.MISSED_COLLECTION + assert health_event[1] == HealthStatus.WARNING + assert health_event[2]['job_name'] == 'test-job' + + class DBExceptionForTests(BaseException): pass From 5809b44bc7cc6111da2dcacebb11b4a5f0ab8ec1 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 09:56:59 -0400 Subject: [PATCH 02/14] Add cooldown for health events --- .../datadog_checks/base/utils/db/health.py | 22 ++++++++++- .../datadog_checks/base/utils/db/utils.py | 12 +++--- .../tests/base/utils/db/test_util.py | 38 +++++++++++-------- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index befebdebc0cdf..f391adcfecc38 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -9,6 +9,8 @@ import time from typing import TYPE_CHECKING +from cachetools import TLRUCache + from datadog_checks.base.utils.serialization import json if TYPE_CHECKING: @@ -40,6 +42,9 @@ class HealthStatus(Enum): WARNING = 'warning' ERROR = 'error' +DEFAULT_COOLDOWN = 60*5 +def ttl(_key, value, now): + return now + value class Health: def __init__(self, check: DatabaseCheck): @@ -50,8 +55,9 @@ def __init__(self, check: DatabaseCheck): The check instance that will be used to submit health events. """ self.check = check + self._ttl_cache = TLRUCache(maxsize=1000, ttu=ttl) - def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: list[str] = None, **kwargs): + def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: list[str] = None, cooldown: bool = False, cooldown_time: int = DEFAULT_COOLDOWN, cooldown_keys: list[str] = None, **kwargs): """ Submit a health event to the aggregator. @@ -61,15 +67,27 @@ def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: lis The health status to submit. :param tags: list of str Tags to associate with the health event. + :param cooldown: int + The cooldown period in seconds to prevent the events with the same name and status from being submitted again. + :param cooldown_keys: list of str + Additional kwargs keys to include in the cooldown key. :param kwargs: Additional keyword arguments to include in the event under `data`. """ + category = self.check.__NAMESPACE__ or self.check.__class__.__name__.lower() + if cooldown: + cooldown_key = "|".join([category, name.value, status.value]) + if cooldown_keys: + cooldown_key = "|".join([cooldown_key, "|".join([f"{k}={kwargs[k]}" for k in cooldown_keys])]) + if self._ttl_cache.get(cooldown_key, None): + return + self._ttl_cache[cooldown_key] = cooldown_time self.check.event_platform_event( json.dumps( { 'timestamp': time.time() * 1000, 'version': 1, 'check_id': self.check.check_id, - 'category': self.check.__NAMESPACE__ or self.check.__class__.__name__.lower(), + 'category': category, 'name': name, 'status': status, 'tags': tags or [], diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index b1267bab692ab..c12f1dbdba2d3 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -348,19 +348,20 @@ def run_job_loop(self, tags): elif self._job_loop_future is None or not self._job_loop_future.running(): self._job_loop_future = DBMAsyncJob.executor.submit(self._job_loop) else: - if time.time() - self._rate_limiter.last_event > self._min_collection_interval: + if self._last_run_start and time.time() - self._last_run_start > self._min_collection_interval: if self._check.health and self._enable_missed_collection_event: # Missed a collection interval, submit a health event self._check.health.submit_health_event( name=HealthEvent.MISSED_COLLECTION, status=HealthStatus.WARNING, tags=self._job_tags, + cooldown=True, + cooldown_time=self._min_collection_interval, + cooldown_keys=['dbms', 'job_name'], dbms=self._dbms, job_name=self._job_name, - collection_interval=self._min_collection_interval, - last_check_run=self._last_check_run, - last_job_loop_time=self._rate_limiter.last_event, - elapsed_time=(time.time() - self._rate_limiter.last_event) * 1000, + last_run_start=self._last_run_start, + elapsed_time=(time.time() - self._last_run_start) * 1000, ) self._log.warning("[%s] Missed collection interval", self._job_name) @@ -432,6 +433,7 @@ def _run_sync_job_rate_limited(self): def _run_job_rate_limited(self): try: + self._last_run_start = time.time() self._run_job_traced() except: raise diff --git a/datadog_checks_base/tests/base/utils/db/test_util.py b/datadog_checks_base/tests/base/utils/db/test_util.py index d9954c63cd12e..276f88f43444b 100644 --- a/datadog_checks_base/tests/base/utils/db/test_util.py +++ b/datadog_checks_base/tests/base/utils/db/test_util.py @@ -13,7 +13,7 @@ from datadog_checks.base import AgentCheck from datadog_checks.base.stubs.datadog_agent import datadog_agent -from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus +from datadog_checks.base.utils.db.health import Health, HealthEvent, HealthStatus from datadog_checks.base.utils.db.utils import ( ConstantRateLimiter, DBMAsyncJob, @@ -124,28 +124,34 @@ def test_ratelimiting_ttl_cache(): assert cache.acquire(i), "cache should be empty again so these keys should go in OK" -class HealthCapture: - def __init__(self): - self.events = [] - def submit_health_event(self, name, status, **kwargs): - self.events.append((name, status, kwargs)) - -def test_dbm_async_job_missed_collection_interval(): +def test_dbm_async_job_missed_collection_interval(aggregator): check = AgentCheck() - check.health = HealthCapture() - job = JobForTesting(check, min_collection_interval=0.1, job_execution_time=1) + health = Health(check) + check.health = health + job = JobForTesting(check, min_collection_interval=0.5, job_execution_time=2) job.run_job_loop([]) - time.sleep(0.2) + # Sleep longer than the target collection interval + time.sleep(0.7) # Simulate the check calling run_job_loop on its run job.run_job_loop([]) + # One more run to check the cooldown + job.run_job_loop([]) job.cancel() - assert len(check.health.events) == 1 - health_event = check.health.events[0] - assert health_event[0] == HealthEvent.MISSED_COLLECTION - assert health_event[1] == HealthStatus.WARNING - assert health_event[2]['job_name'] == 'test-job' + + events = aggregator.get_event_platform_events("dbm-health") + + # The cooldown should prevent the event from being submitted again + assert len(events) == 1 + health_event = events[0] + print(health_event) + assert health_event['name'] == HealthEvent.MISSED_COLLECTION.value + assert health_event['status'] == HealthStatus.WARNING.value + assert health_event['data']['job_name'] == 'test-job' + # This might be flakey, we can adjust the timing if needed + assert health_event['data']['elapsed_time'] > 500 + assert health_event['data']['elapsed_time'] < 1000 class DBExceptionForTests(BaseException): From 3d9d2dd6d9eab20c5459347fb0c5e3759c6b121f Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 09:59:58 -0400 Subject: [PATCH 03/14] Add cooldowns --- .../datadog_checks/base/utils/db/utils.py | 36 ++++++++++--------- .../tests/base/utils/db/test_util.py | 8 ++--- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index c12f1dbdba2d3..786c48e8f62df 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -21,7 +21,7 @@ from datadog_checks.base.agent import datadog_agent from datadog_checks.base.log import get_check_logger from datadog_checks.base.utils.common import to_native_string -from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus +from datadog_checks.base.utils.db.health import DEFAULT_COOLDOWN as DEFAULT_HEALTH_COOLDOWN, HealthEvent, HealthStatus from datadog_checks.base.utils.db.types import Transformer # noqa: F401 from datadog_checks.base.utils.format import json from datadog_checks.base.utils.tracing import INTEGRATION_TRACING_SERVICE_NAME, tracing_enabled @@ -348,22 +348,24 @@ def run_job_loop(self, tags): elif self._job_loop_future is None or not self._job_loop_future.running(): self._job_loop_future = DBMAsyncJob.executor.submit(self._job_loop) else: - if self._last_run_start and time.time() - self._last_run_start > self._min_collection_interval: - if self._check.health and self._enable_missed_collection_event: - # Missed a collection interval, submit a health event - self._check.health.submit_health_event( - name=HealthEvent.MISSED_COLLECTION, - status=HealthStatus.WARNING, - tags=self._job_tags, - cooldown=True, - cooldown_time=self._min_collection_interval, - cooldown_keys=['dbms', 'job_name'], - dbms=self._dbms, - job_name=self._job_name, - last_run_start=self._last_run_start, - elapsed_time=(time.time() - self._last_run_start) * 1000, - ) - self._log.warning("[%s] Missed collection interval", self._job_name) + if self._min_collection_interval >= 1: + # Assume a collection interval of less than 1 second is an attempt to run the job in a loop + if self._last_run_start and time.time() - self._last_run_start > self._min_collection_interval: + if self._check.health and self._enable_missed_collection_event: + # Missed a collection interval, submit a health event + self._check.health.submit_health_event( + name=HealthEvent.MISSED_COLLECTION, + status=HealthStatus.WARNING, + tags=self._job_tags, + cooldown=True, + cooldown_time=min(DEFAULT_HEALTH_COOLDOWN, self._min_collection_interval), + cooldown_keys=['dbms', 'job_name'], + dbms=self._dbms, + job_name=self._job_name, + last_run_start=self._last_run_start, + elapsed_time=(time.time() - self._last_run_start) * 1000, + ) + self._log.warning("[%s] Missed collection interval", self._job_name) self._log.debug("Job loop already running. job=%s", self._job_name) diff --git a/datadog_checks_base/tests/base/utils/db/test_util.py b/datadog_checks_base/tests/base/utils/db/test_util.py index 276f88f43444b..d1b73d34bb770 100644 --- a/datadog_checks_base/tests/base/utils/db/test_util.py +++ b/datadog_checks_base/tests/base/utils/db/test_util.py @@ -130,10 +130,10 @@ def test_dbm_async_job_missed_collection_interval(aggregator): check = AgentCheck() health = Health(check) check.health = health - job = JobForTesting(check, min_collection_interval=0.5, job_execution_time=2) + job = JobForTesting(check, min_collection_interval=1, job_execution_time=3) job.run_job_loop([]) # Sleep longer than the target collection interval - time.sleep(0.7) + time.sleep(1.5) # Simulate the check calling run_job_loop on its run job.run_job_loop([]) # One more run to check the cooldown @@ -150,8 +150,8 @@ def test_dbm_async_job_missed_collection_interval(aggregator): assert health_event['status'] == HealthStatus.WARNING.value assert health_event['data']['job_name'] == 'test-job' # This might be flakey, we can adjust the timing if needed - assert health_event['data']['elapsed_time'] > 500 - assert health_event['data']['elapsed_time'] < 1000 + assert health_event['data']['elapsed_time'] > 1500 + assert health_event['data']['elapsed_time'] < 2000 class DBExceptionForTests(BaseException): From b69973fed82ad955d8267e335854bb76ba1c6767 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 10:14:01 -0400 Subject: [PATCH 04/14] Changelog --- datadog_checks_base/changelog.d/21719.added | 1 + .../datadog_checks/base/utils/db/health.py | 20 ++++++++++++++++--- .../datadog_checks/base/utils/db/utils.py | 3 ++- .../tests/base/utils/db/test_util.py | 4 +--- 4 files changed, 21 insertions(+), 7 deletions(-) create mode 100644 datadog_checks_base/changelog.d/21719.added diff --git a/datadog_checks_base/changelog.d/21719.added b/datadog_checks_base/changelog.d/21719.added new file mode 100644 index 0000000000000..35d23457b8de4 --- /dev/null +++ b/datadog_checks_base/changelog.d/21719.added @@ -0,0 +1 @@ +Add health event for missed DBM async job executions diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index f391adcfecc38..a372ae9578abc 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -42,10 +42,14 @@ class HealthStatus(Enum): WARNING = 'warning' ERROR = 'error' -DEFAULT_COOLDOWN = 60*5 + +DEFAULT_COOLDOWN = 60 * 5 + + def ttl(_key, value, now): return now + value + class Health: def __init__(self, check: DatabaseCheck): """ @@ -57,7 +61,16 @@ def __init__(self, check: DatabaseCheck): self.check = check self._ttl_cache = TLRUCache(maxsize=1000, ttu=ttl) - def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: list[str] = None, cooldown: bool = False, cooldown_time: int = DEFAULT_COOLDOWN, cooldown_keys: list[str] = None, **kwargs): + def submit_health_event( + self, + name: HealthEvent, + status: HealthStatus, + tags: list[str] = None, + cooldown: bool = False, + cooldown_time: int = DEFAULT_COOLDOWN, + cooldown_keys: list[str] = None, + **kwargs, + ): """ Submit a health event to the aggregator. @@ -68,7 +81,8 @@ def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: lis :param tags: list of str Tags to associate with the health event. :param cooldown: int - The cooldown period in seconds to prevent the events with the same name and status from being submitted again. + The cooldown period in seconds to prevent the events with the same name and status + from being submitted again. :param cooldown_keys: list of str Additional kwargs keys to include in the cooldown key. :param kwargs: Additional keyword arguments to include in the event under `data`. diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 786c48e8f62df..9a105c25f6950 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -21,7 +21,8 @@ from datadog_checks.base.agent import datadog_agent from datadog_checks.base.log import get_check_logger from datadog_checks.base.utils.common import to_native_string -from datadog_checks.base.utils.db.health import DEFAULT_COOLDOWN as DEFAULT_HEALTH_COOLDOWN, HealthEvent, HealthStatus +from datadog_checks.base.utils.db.health import DEFAULT_COOLDOWN as DEFAULT_HEALTH_COOLDOWN +from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.types import Transformer # noqa: F401 from datadog_checks.base.utils.format import json from datadog_checks.base.utils.tracing import INTEGRATION_TRACING_SERVICE_NAME, tracing_enabled diff --git a/datadog_checks_base/tests/base/utils/db/test_util.py b/datadog_checks_base/tests/base/utils/db/test_util.py index d1b73d34bb770..c490150ee3095 100644 --- a/datadog_checks_base/tests/base/utils/db/test_util.py +++ b/datadog_checks_base/tests/base/utils/db/test_util.py @@ -124,12 +124,10 @@ def test_ratelimiting_ttl_cache(): assert cache.acquire(i), "cache should be empty again so these keys should go in OK" - - def test_dbm_async_job_missed_collection_interval(aggregator): check = AgentCheck() health = Health(check) - check.health = health + check.health = health job = JobForTesting(check, min_collection_interval=1, job_execution_time=3) job.run_job_loop([]) # Sleep longer than the target collection interval From 6e155a6ddd1d422fc61d2b16a3ea2286f185493c Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 10:14:32 -0400 Subject: [PATCH 05/14] Clean --- datadog_checks_base/tests/base/utils/db/test_util.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_checks_base/tests/base/utils/db/test_util.py b/datadog_checks_base/tests/base/utils/db/test_util.py index c490150ee3095..084b6982e5ade 100644 --- a/datadog_checks_base/tests/base/utils/db/test_util.py +++ b/datadog_checks_base/tests/base/utils/db/test_util.py @@ -143,7 +143,6 @@ def test_dbm_async_job_missed_collection_interval(aggregator): # The cooldown should prevent the event from being submitted again assert len(events) == 1 health_event = events[0] - print(health_event) assert health_event['name'] == HealthEvent.MISSED_COLLECTION.value assert health_event['status'] == HealthStatus.WARNING.value assert health_event['data']['job_name'] == 'test-job' From 44de499b9185fab038a4dca788b6ca4d60798ea6 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 12:06:30 -0400 Subject: [PATCH 06/14] Tag features in health events --- .../datadog_checks/base/utils/db/utils.py | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 9a105c25f6950..6b1dd45b348a4 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -299,6 +299,10 @@ def __init__( # for example if they set the collection interval intentionally low # to effectively run the job in a loop enable_missed_collection_event=True, + # List of features depenedent on the job running + # Default to [None] so that if no features are specified there will + # still be health events submitted for the job + features=[None], ): self._check = check self._config_host = config_host @@ -321,7 +325,7 @@ def __init__( self._expected_db_exceptions = expected_db_exceptions self._job_name = job_name self._enable_missed_collection_event = enable_missed_collection_event - + self._features = features def cancel(self): """ Send a signal to cancel the job loop asynchronously. @@ -353,19 +357,21 @@ def run_job_loop(self, tags): # Assume a collection interval of less than 1 second is an attempt to run the job in a loop if self._last_run_start and time.time() - self._last_run_start > self._min_collection_interval: if self._check.health and self._enable_missed_collection_event: - # Missed a collection interval, submit a health event - self._check.health.submit_health_event( - name=HealthEvent.MISSED_COLLECTION, - status=HealthStatus.WARNING, - tags=self._job_tags, - cooldown=True, - cooldown_time=min(DEFAULT_HEALTH_COOLDOWN, self._min_collection_interval), - cooldown_keys=['dbms', 'job_name'], - dbms=self._dbms, - job_name=self._job_name, - last_run_start=self._last_run_start, - elapsed_time=(time.time() - self._last_run_start) * 1000, - ) + # Missed a collection interval, submit a health event for each feature that depends on this job + for feature in self._features: + self._check.health.submit_health_event( + name=HealthEvent.MISSED_COLLECTION, + status=HealthStatus.WARNING, + tags=self._job_tags, + cooldown=True, + cooldown_time=min(DEFAULT_HEALTH_COOLDOWN, self._min_collection_interval), + cooldown_keys=['dbms', 'job_name'], + dbms=self._dbms, + job_name=self._job_name, + last_run_start=self._last_run_start, + elapsed_time=(time.time() - self._last_run_start) * 1000, + feature=feature, + ) self._log.warning("[%s] Missed collection interval", self._job_name) self._log.debug("Job loop already running. job=%s", self._job_name) From f184704f6820c8d19d3ffdadc7ded4eede7b895d Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 21 Oct 2025 12:10:35 -0400 Subject: [PATCH 07/14] Lint --- datadog_checks_base/datadog_checks/base/utils/db/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 6b1dd45b348a4..4ba0c8ddd64f3 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -300,9 +300,9 @@ def __init__( # to effectively run the job in a loop enable_missed_collection_event=True, # List of features depenedent on the job running - # Default to [None] so that if no features are specified there will + # Defaults to [None] during init so that if no features are specified there will # still be health events submitted for the job - features=[None], + features=None, ): self._check = check self._config_host = config_host @@ -325,7 +325,8 @@ def __init__( self._expected_db_exceptions = expected_db_exceptions self._job_name = job_name self._enable_missed_collection_event = enable_missed_collection_event - self._features = features + self._features = features or [None] + def cancel(self): """ Send a signal to cancel the job loop asynchronously. From 8a2d8fd24bbed3c1916b1813d4767da298e8daea Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Thu, 23 Oct 2025 10:17:49 -0400 Subject: [PATCH 08/14] Fix attr error --- .../datadog_checks/base/utils/db/utils.py | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 4ba0c8ddd64f3..a8f2941e50da2 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -21,7 +21,6 @@ from datadog_checks.base.agent import datadog_agent from datadog_checks.base.log import get_check_logger from datadog_checks.base.utils.common import to_native_string -from datadog_checks.base.utils.db.health import DEFAULT_COOLDOWN as DEFAULT_HEALTH_COOLDOWN from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.types import Transformer # noqa: F401 from datadog_checks.base.utils.format import json @@ -354,25 +353,29 @@ def run_job_loop(self, tags): elif self._job_loop_future is None or not self._job_loop_future.running(): self._job_loop_future = DBMAsyncJob.executor.submit(self._job_loop) else: - if self._min_collection_interval >= 1: + if ( + hasattr(self._check, 'health') + and self._enable_missed_collection_event + and self._min_collection_interval >= 1 + ): # Assume a collection interval of less than 1 second is an attempt to run the job in a loop if self._last_run_start and time.time() - self._last_run_start > self._min_collection_interval: - if self._check.health and self._enable_missed_collection_event: - # Missed a collection interval, submit a health event for each feature that depends on this job - for feature in self._features: - self._check.health.submit_health_event( - name=HealthEvent.MISSED_COLLECTION, - status=HealthStatus.WARNING, - tags=self._job_tags, - cooldown=True, - cooldown_time=min(DEFAULT_HEALTH_COOLDOWN, self._min_collection_interval), - cooldown_keys=['dbms', 'job_name'], - dbms=self._dbms, - job_name=self._job_name, - last_run_start=self._last_run_start, - elapsed_time=(time.time() - self._last_run_start) * 1000, - feature=feature, - ) + # Missed a collection interval, submit a health event for each feature that depends on this job + for feature in self._features: + self._check.health.submit_health_event( + name=HealthEvent.MISSED_COLLECTION, + status=HealthStatus.WARNING, + tags=self._job_tags, + # Use a cooldown to avoid spamming if the job is missing the collection interval + # in a flappy manner + cooldown=True, + cooldown_keys=['dbms', 'job_name'], + dbms=self._dbms, + job_name=self._job_name, + last_run_start=self._last_run_start, + elapsed_time=(time.time() - self._last_run_start) * 1000, + feature=feature, + ) self._log.warning("[%s] Missed collection interval", self._job_name) self._log.debug("Job loop already running. job=%s", self._job_name) From 76ab2e2ed58dbeb7b6ccf40dff57c5c1a0a80638 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Thu, 23 Oct 2025 13:41:04 -0400 Subject: [PATCH 09/14] cooldown values --- .../datadog_checks/base/utils/db/health.py | 10 +++++----- .../datadog_checks/base/utils/db/utils.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index a372ae9578abc..a11a64788990b 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -68,7 +68,7 @@ def submit_health_event( tags: list[str] = None, cooldown: bool = False, cooldown_time: int = DEFAULT_COOLDOWN, - cooldown_keys: list[str] = None, + cooldown_values: list[str] = None, **kwargs, ): """ @@ -83,15 +83,15 @@ def submit_health_event( :param cooldown: int The cooldown period in seconds to prevent the events with the same name and status from being submitted again. - :param cooldown_keys: list of str - Additional kwargs keys to include in the cooldown key. + :param cooldown_values: list of str + Additional values to include in the cooldown key. :param kwargs: Additional keyword arguments to include in the event under `data`. """ category = self.check.__NAMESPACE__ or self.check.__class__.__name__.lower() if cooldown: cooldown_key = "|".join([category, name.value, status.value]) - if cooldown_keys: - cooldown_key = "|".join([cooldown_key, "|".join([f"{k}={kwargs[k]}" for k in cooldown_keys])]) + if cooldown_values: + cooldown_key = "|".join([cooldown_key, "|".join([f"{v}" for v in cooldown_values])]) if self._ttl_cache.get(cooldown_key, None): return self._ttl_cache[cooldown_key] = cooldown_time diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index a8f2941e50da2..760d2e8fd2240 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -369,7 +369,7 @@ def run_job_loop(self, tags): # Use a cooldown to avoid spamming if the job is missing the collection interval # in a flappy manner cooldown=True, - cooldown_keys=['dbms', 'job_name'], + cooldown_values=[self._dbms, self._job_name], dbms=self._dbms, job_name=self._job_name, last_run_start=self._last_run_start, From b1d9fd0c0e2a5c01cc6e86d6f15949cf40ce3821 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Thu, 23 Oct 2025 13:47:34 -0400 Subject: [PATCH 10/14] Rename --- .../datadog_checks/base/utils/db/health.py | 6 +++--- .../datadog_checks/base/utils/db/utils.py | 12 +++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index a11a64788990b..34a49e0e24545 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -69,7 +69,7 @@ def submit_health_event( cooldown: bool = False, cooldown_time: int = DEFAULT_COOLDOWN, cooldown_values: list[str] = None, - **kwargs, + data: dict = None, ): """ Submit a health event to the aggregator. @@ -85,7 +85,7 @@ def submit_health_event( from being submitted again. :param cooldown_values: list of str Additional values to include in the cooldown key. - :param kwargs: Additional keyword arguments to include in the event under `data`. + :param data: A dictionary to be submitted as `data`. Must be JSON serializable. """ category = self.check.__NAMESPACE__ or self.check.__class__.__name__.lower() if cooldown: @@ -107,7 +107,7 @@ def submit_health_event( 'tags': tags or [], 'ddagentversion': datadog_agent.get_version(), 'ddagenthostname': datadog_agent.get_hostname(), - 'data': {**kwargs}, + 'data': data, } ), "dbm-health", diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 760d2e8fd2240..5da497050536e 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -370,11 +370,13 @@ def run_job_loop(self, tags): # in a flappy manner cooldown=True, cooldown_values=[self._dbms, self._job_name], - dbms=self._dbms, - job_name=self._job_name, - last_run_start=self._last_run_start, - elapsed_time=(time.time() - self._last_run_start) * 1000, - feature=feature, + data={ + "dbms": self._dbms, + "job_name": self._job_name, + "last_run_start": self._last_run_start, + "elapsed_time": (time.time() - self._last_run_start) * 1000, + "feature": feature, + }, ) self._log.warning("[%s] Missed collection interval", self._job_name) From 29b5509ea14418ca162f37e72df3be5c60fd0695 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Thu, 23 Oct 2025 15:18:58 -0400 Subject: [PATCH 11/14] Update datadog_checks_base/datadog_checks/base/utils/db/utils.py Co-authored-by: Eric Weaver --- datadog_checks_base/datadog_checks/base/utils/db/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 5da497050536e..124d040b403a0 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -324,7 +324,9 @@ def __init__( self._expected_db_exceptions = expected_db_exceptions self._job_name = job_name self._enable_missed_collection_event = enable_missed_collection_event - self._features = features or [None] + self._features = features + if self._features is None: + self._features = [None] def cancel(self): """ From 48618e8505a0c953d467f87f84c1c4afdb75a962 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Fri, 24 Oct 2025 13:52:06 -0400 Subject: [PATCH 12/14] Replace log with metric --- datadog_checks_base/datadog_checks/base/utils/db/utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 124d040b403a0..d30d22b73b0f0 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -359,9 +359,11 @@ def run_job_loop(self, tags): hasattr(self._check, 'health') and self._enable_missed_collection_event and self._min_collection_interval >= 1 + and self._last_run_start ): # Assume a collection interval of less than 1 second is an attempt to run the job in a loop - if self._last_run_start and time.time() - self._last_run_start > self._min_collection_interval: + elapsed_time = time.time() - self._last_run_start + if elapsed_time > self._min_collection_interval: # Missed a collection interval, submit a health event for each feature that depends on this job for feature in self._features: self._check.health.submit_health_event( @@ -380,7 +382,9 @@ def run_job_loop(self, tags): "feature": feature, }, ) - self._log.warning("[%s] Missed collection interval", self._job_name) + self._check.count( + "dd.{}.async_job.missed_collection".format(self._dbms), 1, tags=self._job_tags, raw=True + ) self._log.debug("Job loop already running. job=%s", self._job_name) From f28507a1a95cc1da15ab9b4ec4c1632fd874e2f6 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Fri, 24 Oct 2025 14:30:52 -0400 Subject: [PATCH 13/14] Add lock to health cache --- .../datadog_checks/base/utils/db/health.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index 34a49e0e24545..8dd8b084278d9 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -16,9 +16,10 @@ if TYPE_CHECKING: from datadog_checks.base import DatabaseCheck try: - import datadog_agent + import datadog_agent # type: ignore except ImportError: from datadog_checks.base.stubs import datadog_agent +import threading from enum import Enum @@ -59,6 +60,7 @@ def __init__(self, check: DatabaseCheck): The check instance that will be used to submit health events. """ self.check = check + self._cache_lock = threading.Lock() self._ttl_cache = TLRUCache(maxsize=1000, ttu=ttl) def submit_health_event( @@ -89,12 +91,13 @@ def submit_health_event( """ category = self.check.__NAMESPACE__ or self.check.__class__.__name__.lower() if cooldown: - cooldown_key = "|".join([category, name.value, status.value]) - if cooldown_values: - cooldown_key = "|".join([cooldown_key, "|".join([f"{v}" for v in cooldown_values])]) - if self._ttl_cache.get(cooldown_key, None): - return - self._ttl_cache[cooldown_key] = cooldown_time + with self._cache_lock: + cooldown_key = "|".join([category, name.value, status.value]) + if cooldown_values: + cooldown_key = "|".join([cooldown_key, "|".join([f"{v}" for v in cooldown_values])]) + if self._ttl_cache.get(cooldown_key, None): + return + self._ttl_cache[cooldown_key] = cooldown_time self.check.event_platform_event( json.dumps( { From 0b088f566c1b1d102e0d82454e6df62625eb350f Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Fri, 24 Oct 2025 14:36:55 -0400 Subject: [PATCH 14/14] Lint --- .../datadog_checks/base/utils/db/health.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/health.py b/datadog_checks_base/datadog_checks/base/utils/db/health.py index 8dd8b084278d9..c43d05bc4038d 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/health.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/health.py @@ -16,12 +16,10 @@ if TYPE_CHECKING: from datadog_checks.base import DatabaseCheck try: - import datadog_agent # type: ignore + import datadog_agent # type: ignore except ImportError: from datadog_checks.base.stubs import datadog_agent import threading - - from enum import Enum @@ -91,10 +89,10 @@ def submit_health_event( """ category = self.check.__NAMESPACE__ or self.check.__class__.__name__.lower() if cooldown: + cooldown_key = "|".join([category, name.value, status.value]) + if cooldown_values: + cooldown_key = "|".join([cooldown_key, "|".join([f"{v}" for v in cooldown_values])]) with self._cache_lock: - cooldown_key = "|".join([category, name.value, status.value]) - if cooldown_values: - cooldown_key = "|".join([cooldown_key, "|".join([f"{v}" for v in cooldown_values])]) if self._ttl_cache.get(cooldown_key, None): return self._ttl_cache[cooldown_key] = cooldown_time