Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiling): Continuous profiling lifecycle #4017

Merged
merged 5 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CompressionAlgo(Enum):
from typing import Any
from typing import Sequence
from typing import Tuple
from typing_extensions import Literal
from typing_extensions import TypedDict

from sentry_sdk._types import (
Expand Down Expand Up @@ -528,6 +529,7 @@ def __init__(
profiles_sample_rate=None, # type: Optional[float]
profiles_sampler=None, # type: Optional[TracesSampler]
profiler_mode=None, # type: Optional[ProfilerMode]
profile_lifecycle="manual", # type: Literal["manual", "trace"]
profile_session_sample_rate=None, # type: Optional[float]
auto_enabling_integrations=True, # type: bool
disabled_integrations=None, # type: Optional[Sequence[sentry_sdk.integrations.Integration]]
Expand Down
172 changes: 146 additions & 26 deletions sentry_sdk/profiler/continuous_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import threading
import time
import uuid
from collections import deque
from datetime import datetime, timezone

from sentry_sdk.consts import VERSION
Expand All @@ -27,9 +28,11 @@
if TYPE_CHECKING:
from typing import Any
from typing import Callable
from typing import Deque
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Type
from typing import Union
from typing_extensions import TypedDict
Expand Down Expand Up @@ -120,6 +123,9 @@ def setup_continuous_profiler(options, sdk_info, capture_func):

def try_autostart_continuous_profiler():
# type: () -> None

# TODO: deprecate this as it'll be replaced by the auto lifecycle option

if _scheduler is None:
return

Expand All @@ -129,6 +135,14 @@ def try_autostart_continuous_profiler():
_scheduler.manual_start()


def try_profile_lifecycle_trace_start():
# type: () -> Union[ContinuousProfile, None]
if _scheduler is None:
return None

return _scheduler.auto_start()


def start_profiler():
# type: () -> None
if _scheduler is None:
Expand Down Expand Up @@ -170,6 +184,14 @@ def determine_profile_session_sampling_decision(sample_rate):
return random.random() < float(sample_rate)


class ContinuousProfile:
active: bool = True

def stop(self):
# type: () -> None
self.active = False


class ContinuousScheduler:
mode = "unknown" # type: ContinuousProfilerMode

Expand All @@ -179,16 +201,21 @@ def __init__(self, frequency, options, sdk_info, capture_func):
self.options = options
self.sdk_info = sdk_info
self.capture_func = capture_func

self.lifecycle = self.options.get("profile_lifecycle")
profile_session_sample_rate = self.options.get("profile_session_sample_rate")
self.sampled = determine_profile_session_sampling_decision(
profile_session_sample_rate
)

self.sampler = self.make_sampler()
self.buffer = None # type: Optional[ProfileBuffer]
self.pid = None # type: Optional[int]

self.running = False

profile_session_sample_rate = self.options.get("profile_session_sample_rate")
self.sampled = determine_profile_session_sampling_decision(
profile_session_sample_rate
)
self.new_profiles = deque(maxlen=128) # type: Deque[ContinuousProfile]
self.active_profiles = set() # type: Set[ContinuousProfile]

def is_auto_start_enabled(self):
# type: () -> bool
Expand All @@ -207,15 +234,38 @@ def is_auto_start_enabled(self):

return experiments.get("continuous_profiling_auto_start")

def auto_start(self):
# type: () -> Union[ContinuousProfile, None]
if not self.sampled:
return None

if self.lifecycle != "trace":
return None

logger.debug("[Profiling] Auto starting profiler")

profile = ContinuousProfile()

self.new_profiles.append(profile)
self.ensure_running()

return profile

def manual_start(self):
# type: () -> None
if not self.sampled:
return

if self.lifecycle != "manual":
return

self.ensure_running()

def manual_stop(self):
# type: () -> None
if self.lifecycle != "manual":
return

self.teardown()

def ensure_running(self):
Expand Down Expand Up @@ -249,28 +299,97 @@ def make_sampler(self):

cache = LRUCache(max_size=256)

def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

ts = now()

try:
sample = [
(str(tid), extract_stack(frame, cache, cwd))
for tid, frame in sys._current_frames().items()
]
except AttributeError:
# For some reason, the frame we get doesn't have certain attributes.
# When this happens, we abandon the current sample as it's bad.
capture_internal_exception(sys.exc_info())
return

if self.buffer is not None:
self.buffer.write(ts, sample)
if self.lifecycle == "trace":

def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

# no profiles taking place, so we can stop early
if not self.new_profiles and not self.active_profiles:
self.running = False
return

# This is the number of profiles we want to pop off.
# It's possible another thread adds a new profile to
# the list and we spend longer than we want inside
# the loop below.
#
# Also make sure to set this value before extracting
# frames so we do not write to any new profiles that
# were started after this point.
new_profiles = len(self.new_profiles)

ts = now()

try:
sample = [
(str(tid), extract_stack(frame, cache, cwd))
for tid, frame in sys._current_frames().items()
]
except AttributeError:
# For some reason, the frame we get doesn't have certain attributes.
# When this happens, we abandon the current sample as it's bad.
capture_internal_exception(sys.exc_info())
return

# Move the new profiles into the active_profiles set.
#
# We cannot directly add the to active_profiles set
# in `start_profiling` because it is called from other
# threads which can cause a RuntimeError when it the
# set sizes changes during iteration without a lock.
#
# We also want to avoid using a lock here so threads
# that are starting profiles are not blocked until it
# can acquire the lock.
for _ in range(new_profiles):
self.active_profiles.add(self.new_profiles.popleft())
inactive_profiles = []

for profile in self.active_profiles:
if profile.active:
pass
else:
# If a profile is marked inactive, we buffer it
# to `inactive_profiles` so it can be removed.
# We cannot remove it here as it would result
# in a RuntimeError.
inactive_profiles.append(profile)

for profile in inactive_profiles:
self.active_profiles.remove(profile)

if self.buffer is not None:
self.buffer.write(ts, sample)

else:

def _sample_stack(*args, **kwargs):
# type: (*Any, **Any) -> None
"""
Take a sample of the stack on all the threads in the process.
This should be called at a regular interval to collect samples.
"""

ts = now()

try:
sample = [
(str(tid), extract_stack(frame, cache, cwd))
for tid, frame in sys._current_frames().items()
]
except AttributeError:
# For some reason, the frame we get doesn't have certain attributes.
# When this happens, we abandon the current sample as it's bad.
capture_internal_exception(sys.exc_info())
return

if self.buffer is not None:
self.buffer.write(ts, sample)

return _sample_stack

Expand All @@ -294,6 +413,7 @@ def run(self):

if self.buffer is not None:
self.buffer.flush()
self.buffer = None


class ThreadContinuousScheduler(ContinuousScheduler):
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/profiler/transaction_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def _sample_stack(*args, **kwargs):
if profile.active:
profile.write(now, sample)
else:
# If a thread is marked inactive, we buffer it
# If a profile is marked inactive, we buffer it
# to `inactive_profiles` so it can be removed.
# We cannot remove it here as it would result
# in a RuntimeError.
Expand Down
14 changes: 13 additions & 1 deletion sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from sentry_sdk.attachments import Attachment
from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, FALSE_VALUES, INSTRUMENTER
from sentry_sdk.feature_flags import FlagBuffer, DEFAULT_FLAG_CAPACITY
from sentry_sdk.profiler.continuous_profiler import try_autostart_continuous_profiler
from sentry_sdk.profiler.continuous_profiler import (
get_profiler_id,
try_autostart_continuous_profiler,
try_profile_lifecycle_trace_start,
)
from sentry_sdk.profiler.transaction_profiler import Profile
from sentry_sdk.session import Session
from sentry_sdk.tracing_utils import (
Expand Down Expand Up @@ -1051,6 +1055,14 @@ def start_transaction(

transaction._profile = profile

transaction._continuous_profile = try_profile_lifecycle_trace_start()

# Typically, the profiler is set when the transaction is created. But when
# using the auto lifecycle, the profiler isn't running when the first
# transaction is started. So make sure we update the profiler id on it.
if transaction._continuous_profile is not None:
transaction.set_profiler_id(get_profiler_id())

# we don't bother to keep spans if we already know we're not going to
# send the transaction
max_spans = (client.options["_experiments"].get("max_spans")) or 1000
Expand Down
12 changes: 8 additions & 4 deletions sentry_sdk/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
P = ParamSpec("P")
R = TypeVar("R")

import sentry_sdk.profiler
from sentry_sdk.profiler.continuous_profiler import ContinuousProfile
from sentry_sdk.profiler.transaction_profiler import Profile
from sentry_sdk._types import (
Event,
MeasurementUnit,
Expand Down Expand Up @@ -767,6 +768,7 @@ class Transaction(Span):
"_measurements",
"_contexts",
"_profile",
"_continuous_profile",
"_baggage",
)

Expand All @@ -788,9 +790,8 @@ def __init__( # type: ignore[misc]
self.parent_sampled = parent_sampled
self._measurements = {} # type: Dict[str, MeasurementValue]
self._contexts = {} # type: Dict[str, Any]
self._profile = (
None
) # type: Optional[sentry_sdk.profiler.transaction_profiler.Profile]
self._profile = None # type: Optional[Profile]
self._continuous_profile = None # type: Optional[ContinuousProfile]
self._baggage = baggage

def __repr__(self):
Expand Down Expand Up @@ -843,6 +844,9 @@ def __exit__(self, ty, value, tb):
if self._profile is not None:
self._profile.__exit__(ty, value, tb)

if self._continuous_profile is not None:
self._continuous_profile.stop()

super().__exit__(ty, value, tb)

@property
Expand Down
Loading
Loading