Skip to content
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/21719.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add health event for missed DBM async job executions
50 changes: 42 additions & 8 deletions datadog_checks_base/datadog_checks/base/utils/db/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
import time
from typing import TYPE_CHECKING

from cachetools import TLRUCache

from datadog_checks.base.utils.serialization import json

if TYPE_CHECKING:
from datadog_checks.base import DatabaseCheck
try:
import datadog_agent
import datadog_agent # type: ignore
except ImportError:
from datadog_checks.base.stubs import datadog_agent


import threading
from enum import Enum


Expand All @@ -28,6 +29,7 @@ class HealthEvent(Enum):
"""

INITIALIZATION = 'initialization'
MISSED_COLLECTION = 'missed_collection'


class HealthStatus(Enum):
Expand All @@ -40,6 +42,13 @@ class HealthStatus(Enum):
ERROR = 'error'


DEFAULT_COOLDOWN = 60 * 5


def ttl(_key, value, now):
return now + value


class Health:
def __init__(self, check: DatabaseCheck):
"""
Expand All @@ -49,8 +58,19 @@ def __init__(self, check: DatabaseCheck):
The check instance that will be used to submit health events.
"""
self.check = check

def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: list[str] = None, **kwargs):
self._cache_lock = threading.Lock()
self._ttl_cache = TLRUCache(maxsize=1000, ttu=ttl)

def submit_health_event(
self,
name: HealthEvent,
status: HealthStatus,
tags: list[str] = None,
cooldown: bool = False,
cooldown_time: int = DEFAULT_COOLDOWN,
cooldown_values: list[str] = None,
data: dict = None,
):
"""
Submit a health event to the aggregator.

Expand All @@ -60,21 +80,35 @@ def submit_health_event(self, name: HealthEvent, status: HealthStatus, tags: lis
The health status to submit.
:param tags: list of str
Tags to associate with the health event.
:param kwargs: Additional keyword arguments to include in the event under `data`.
:param cooldown: int
The cooldown period in seconds to prevent the events with the same name and status
from being submitted again.
:param cooldown_values: list of str
Additional values to include in the cooldown key.
:param data: A dictionary to be submitted as `data`. Must be JSON serializable.
"""
category = self.check.__NAMESPACE__ or self.check.__class__.__name__.lower()
if cooldown:
cooldown_key = "|".join([category, name.value, status.value])
if cooldown_values:
cooldown_key = "|".join([cooldown_key, "|".join([f"{v}" for v in cooldown_values])])
with self._cache_lock:
if self._ttl_cache.get(cooldown_key, None):
return
self._ttl_cache[cooldown_key] = cooldown_time
self.check.event_platform_event(
json.dumps(
{
'timestamp': time.time() * 1000,
'version': 1,
'check_id': self.check.check_id,
'category': self.check.__NAMESPACE__ or self.check.__class__.__name__.lower(),
'category': category,
'name': name,
'status': status,
'tags': tags or [],
'ddagentversion': datadog_agent.get_version(),
'ddagenthostname': datadog_agent.get_hostname(),
'data': {**kwargs},
'data': data,
}
),
"dbm-health",
Expand Down
45 changes: 45 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from datadog_checks.base.agent import datadog_agent
from datadog_checks.base.log import get_check_logger
from datadog_checks.base.utils.common import to_native_string
from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus
from datadog_checks.base.utils.db.types import Transformer # noqa: F401
from datadog_checks.base.utils.format import json
from datadog_checks.base.utils.tracing import INTEGRATION_TRACING_SERVICE_NAME, tracing_enabled
Expand Down Expand Up @@ -293,6 +294,14 @@ def __init__(
expected_db_exceptions=(),
shutdown_callback=None,
job_name=None,
# Some users may want to disable the missed collection event,
# for example if they set the collection interval intentionally low
# to effectively run the job in a loop
enable_missed_collection_event=True,
# List of features depenedent on the job running
# Defaults to [None] during init so that if no features are specified there will
# still be health events submitted for the job
features=None,
):
self._check = check
self._config_host = config_host
Expand All @@ -314,6 +323,10 @@ def __init__(
self._enabled = enabled
self._expected_db_exceptions = expected_db_exceptions
self._job_name = job_name
self._enable_missed_collection_event = enable_missed_collection_event
self._features = features
if self._features is None:
self._features = [None]

def cancel(self):
"""
Expand Down Expand Up @@ -342,6 +355,37 @@ def run_job_loop(self, tags):
elif self._job_loop_future is None or not self._job_loop_future.running():
self._job_loop_future = DBMAsyncJob.executor.submit(self._job_loop)
else:
if (
hasattr(self._check, 'health')
and self._enable_missed_collection_event
and self._min_collection_interval >= 1
and self._last_run_start
):
# Assume a collection interval of less than 1 second is an attempt to run the job in a loop
elapsed_time = time.time() - self._last_run_start
if elapsed_time > self._min_collection_interval:
# Missed a collection interval, submit a health event for each feature that depends on this job
for feature in self._features:
self._check.health.submit_health_event(
name=HealthEvent.MISSED_COLLECTION,
status=HealthStatus.WARNING,
tags=self._job_tags,
# Use a cooldown to avoid spamming if the job is missing the collection interval
# in a flappy manner
cooldown=True,
cooldown_values=[self._dbms, self._job_name],
data={
"dbms": self._dbms,
"job_name": self._job_name,
"last_run_start": self._last_run_start,
"elapsed_time": (time.time() - self._last_run_start) * 1000,
"feature": feature,
},
)
self._check.count(
"dd.{}.async_job.missed_collection".format(self._dbms), 1, tags=self._job_tags, raw=True
)

self._log.debug("Job loop already running. job=%s", self._job_name)

def _job_loop(self):
Expand Down Expand Up @@ -410,6 +454,7 @@ def _run_sync_job_rate_limited(self):

def _run_job_rate_limited(self):
try:
self._last_run_start = time.time()
self._run_job_traced()
except:
raise
Expand Down
28 changes: 28 additions & 0 deletions datadog_checks_base/tests/base/utils/db/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from datadog_checks.base import AgentCheck
from datadog_checks.base.stubs.datadog_agent import datadog_agent
from datadog_checks.base.utils.db.health import Health, HealthEvent, HealthStatus
from datadog_checks.base.utils.db.utils import (
ConstantRateLimiter,
DBMAsyncJob,
Expand Down Expand Up @@ -123,6 +124,33 @@ def test_ratelimiting_ttl_cache():
assert cache.acquire(i), "cache should be empty again so these keys should go in OK"


def test_dbm_async_job_missed_collection_interval(aggregator):
check = AgentCheck()
health = Health(check)
check.health = health
job = JobForTesting(check, min_collection_interval=1, job_execution_time=3)
job.run_job_loop([])
# Sleep longer than the target collection interval
time.sleep(1.5)
# Simulate the check calling run_job_loop on its run
job.run_job_loop([])
# One more run to check the cooldown
job.run_job_loop([])
job.cancel()

events = aggregator.get_event_platform_events("dbm-health")

# The cooldown should prevent the event from being submitted again
assert len(events) == 1
health_event = events[0]
assert health_event['name'] == HealthEvent.MISSED_COLLECTION.value
assert health_event['status'] == HealthStatus.WARNING.value
assert health_event['data']['job_name'] == 'test-job'
# This might be flakey, we can adjust the timing if needed
assert health_event['data']['elapsed_time'] > 1500
assert health_event['data']['elapsed_time'] < 2000


class DBExceptionForTests(BaseException):
pass

Expand Down
Loading