Skip to content

chore(llmobs): dac strip io from vertex #13693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ ddtrace/internal/_exceptions.py @DataDog/asm-python
ddtrace/internal/appsec/ @DataDog/asm-python
ddtrace/internal/iast/ @DataDog/asm-python
tests/appsec/ @DataDog/asm-python
tests/contrib/dbapi/test_dbapi_appsec.py @DataDog/asm-python
tests/contrib/subprocess @DataDog/asm-python
tests/contrib/flask/test_flask_appsec.py @DataDog/asm-python
tests/snapshots/tests*appsec*.json @DataDog/asm-python
tests/contrib/*/test*appsec*.py @DataDog/asm-python
scripts/iast/* @DataDog/asm-python
Expand Down Expand Up @@ -177,6 +175,8 @@ tests/contrib/crewai @DataDog/ml-observ
tests/contrib/openai_agents @DataDog/ml-observability
tests/contrib/litellm @DataDog/ml-observability
.gitlab/tests/llmobs.yml @DataDog/ml-observability
# MLObs snapshot tests
tests/snapshots/tests.contrib.vertexai.* @DataDog/apm-python @DataDog/ml-observability
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think apm-python needs to be included on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up pulling this into its own pr to reassign all the llmobs integration test snapshots; with your feedback integrated


# Remote Config
ddtrace/internal/remoteconfig @DataDog/remote-config @DataDog/apm-core-python
Expand Down
165 changes: 0 additions & 165 deletions ddtrace/contrib/internal/vertexai/_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
import sys

from vertexai.generative_models import GenerativeModel
from vertexai.generative_models import Part

from ddtrace.internal.utils import get_argument_value
from ddtrace.llmobs._integrations.utils import get_generation_config_google
from ddtrace.llmobs._integrations.utils import get_system_instructions_from_google_model
from ddtrace.llmobs._integrations.utils import tag_request_content_part_google
from ddtrace.llmobs._integrations.utils import tag_response_part_google
Comment on lines -3 to -10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are still using these helpers in the google generative ai integration. Once we remove the APM tagging there, we can get rid of these helpers altogether :)

from ddtrace.llmobs._utils import _get_attr


Expand Down Expand Up @@ -44,7 +36,6 @@ def __iter__(self):
self._dd_span.set_exc_info(*sys.exc_info())
raise
finally:
tag_stream_response(self._dd_span, self._chunks, self._dd_integration)
if self._dd_integration.is_pc_sampled_llmobs(self._dd_span):
self._kwargs["instance"] = self._model_instance
self._kwargs["history"] = self._history
Expand Down Expand Up @@ -74,7 +65,6 @@ async def __aiter__(self):
self._dd_span.set_exc_info(*sys.exc_info())
raise
finally:
tag_stream_response(self._dd_span, self._chunks, self._dd_integration)
if self._dd_integration.is_pc_sampled_llmobs(self._dd_span):
self._kwargs["instance"] = self._model_instance
self._kwargs["history"] = self._history
Expand All @@ -95,158 +85,3 @@ def extract_info_from_parts(parts):
if function_call is not None:
function_calls.append(function_call)
return concatenated_text, function_calls


def _tag_response_parts(span, integration, parts):
text, function_calls = extract_info_from_parts(parts)
span.set_tag_str(
"vertexai.response.candidates.%d.content.parts.%d.text" % (0, 0),
integration.trunc(str(text)),
)
for idx, function_call in enumerate(function_calls):
span.set_tag_str(
"vertexai.response.candidates.%d.content.parts.%d.function_calls.%d.function_call.name" % (0, 0, idx),
_get_attr(function_call, "name", ""),
)
span.set_tag_str(
"vertexai.response.candidates.%d.content.parts.%d.function_calls.%d.function_call.args" % (0, 0, idx),
integration.trunc(str(_get_attr(function_call, "args", ""))),
)


def tag_stream_response(span, chunks, integration):
all_parts = []
role = ""
for chunk in chunks:
candidates = _get_attr(chunk, "candidates", [])
for candidate_idx, candidate in enumerate(candidates):
finish_reason = _get_attr(candidate, "finish_reason", None)
if finish_reason:
span.set_tag_str(
"vertexai.response.candidates.%d.finish_reason" % (candidate_idx),
_get_attr(finish_reason, "name", ""),
)
candidate_content = _get_attr(candidate, "content", {})
role = role or _get_attr(candidate_content, "role", "")
if not integration.is_pc_sampled_span(span):
continue
parts = _get_attr(candidate_content, "parts", [])
all_parts.extend(parts)
token_counts = _get_attr(chunk, "usage_metadata", None)
if not token_counts:
continue
span.set_metric("vertexai.response.usage.prompt_tokens", _get_attr(token_counts, "prompt_token_count", 0))
span.set_metric(
"vertexai.response.usage.completion_tokens", _get_attr(token_counts, "candidates_token_count", 0)
)
span.set_metric("vertexai.response.usage.total_tokens", _get_attr(token_counts, "total_token_count", 0))
# streamed responses have only a single candidate, so there is only one role to be tagged
span.set_tag_str("vertexai.response.candidates.0.content.role", str(role))
_tag_response_parts(span, integration, all_parts)


def _tag_request_content(span, integration, content, content_idx):
"""Tag the generation span with request contents."""
if isinstance(content, str):
span.set_tag_str("vertexai.request.contents.%d.text" % content_idx, integration.trunc(content))
return
if isinstance(content, dict):
role = content.get("role", "")
if role:
span.set_tag_str("vertexai.request.contents.%d.role" % content_idx, role)
parts = content.get("parts", [])
for part_idx, part in enumerate(parts):
tag_request_content_part_google("vertexai", span, integration, part, part_idx, content_idx)
return
if isinstance(content, Part):
tag_request_content_part_google("vertexai", span, integration, content, 0, content_idx)
return
role = _get_attr(content, "role", "")
if role:
span.set_tag_str("vertexai.request.contents.%d.role" % content_idx, str(role))
parts = _get_attr(content, "parts", [])
if not parts:
span.set_tag_str(
"vertexai.request.contents.%d.text" % content_idx,
integration.trunc("[Non-text content object: {}]".format(repr(content))),
)
return
for part_idx, part in enumerate(parts):
tag_request_content_part_google("vertexai", span, integration, part, part_idx, content_idx)


def tag_request(span, integration, instance, args, kwargs, is_chat):
"""Tag the generation span with request details.
Includes capturing generation configuration, system prompt, and messages.
"""
# instance is either a chat session or a model itself
model_instance = instance if isinstance(instance, GenerativeModel) else instance._model
contents = get_argument_value(args, kwargs, 0, "content" if is_chat else "contents")
history = _get_attr(instance, "_history", [])
if history:
if isinstance(contents, list):
contents = history + contents
if isinstance(contents, Part) or isinstance(contents, str) or isinstance(contents, dict):
contents = history + [contents]
generation_config = get_generation_config_google(model_instance, kwargs)
generation_config_dict = None
if generation_config is not None:
generation_config_dict = (
generation_config if isinstance(generation_config, dict) else generation_config.to_dict()
)
system_instructions = get_system_instructions_from_google_model(model_instance)
stream = kwargs.get("stream", None)

if generation_config_dict is not None:
for k, v in generation_config_dict.items():
span.set_tag_str("vertexai.request.generation_config.%s" % k, str(v))

if stream:
span.set_tag("vertexai.request.stream", True)

if not integration.is_pc_sampled_span(span):
return

for idx, text in enumerate(system_instructions):
span.set_tag_str(
"vertexai.request.system_instruction.%d.text" % idx,
integration.trunc(str(text)),
)

if isinstance(contents, str):
span.set_tag_str("vertexai.request.contents.0.text", integration.trunc(str(contents)))
return
elif isinstance(contents, Part):
tag_request_content_part_google("vertexai", span, integration, contents, 0, 0)
return
elif not isinstance(contents, list):
return
for content_idx, content in enumerate(contents):
_tag_request_content(span, integration, content, content_idx)


def tag_response(span, generations, integration):
"""Tag the generation span with response details.
Includes capturing generation text, roles, finish reasons, and token counts.
"""
generations_dict = generations.to_dict()
candidates = generations_dict.get("candidates", [])
for candidate_idx, candidate in enumerate(candidates):
finish_reason = _get_attr(candidate, "finish_reason", None)
if finish_reason:
span.set_tag_str("vertexai.response.candidates.%d.finish_reason" % candidate_idx, finish_reason)
candidate_content = _get_attr(candidate, "content", None)
role = _get_attr(candidate_content, "role", "")
span.set_tag_str("vertexai.response.candidates.%d.content.role" % candidate_idx, str(role))
if not integration.is_pc_sampled_span(span):
continue
parts = _get_attr(candidate_content, "parts", [])
for part_idx, part in enumerate(parts):
tag_response_part_google("vertexai", span, integration, part, part_idx, candidate_idx)

token_counts = generations_dict.get("usage_metadata", None)
if not token_counts:
return
span.set_metric("vertexai.response.usage.prompt_tokens", _get_attr(token_counts, "prompt_token_count", 0))
span.set_metric("vertexai.response.usage.completion_tokens", _get_attr(token_counts, "candidates_token_count", 0))
span.set_metric("vertexai.response.usage.total_tokens", _get_attr(token_counts, "total_token_count", 0))
9 changes: 3 additions & 6 deletions ddtrace/contrib/internal/vertexai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

import vertexai

# Force the generative_models module to load
from vertexai.generative_models import GenerativeModel # noqa:F401
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this import (which used to exist on ./_utils.py but is no longer used there) breaks tests. As far as I can tell, it may have to do with vertex lazy loading, meaning that .generative_models may not exist yet when patching.
If this is a known thing that we have a standard way to deal with, let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm interesting, I would probably put this import in the patch function directly (as long as that works) so that we only import it when we actually do the patching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still fails. Looking closer, it will actually fail on the assertion that we are unpatched before calling .patch, since the thing we check isn't imported. There might be a way to move this to the test itself (a little tricky since its the base tests, not the vertex ones). Do you think that would be worth it?


from ddtrace import config
from ddtrace.contrib.internal.trace_utils import unwrap
from ddtrace.contrib.internal.trace_utils import with_traced_module
from ddtrace.contrib.internal.trace_utils import wrap
from ddtrace.contrib.internal.vertexai._utils import TracedAsyncVertexAIStreamResponse
from ddtrace.contrib.internal.vertexai._utils import TracedVertexAIStreamResponse
from ddtrace.contrib.internal.vertexai._utils import tag_request
from ddtrace.contrib.internal.vertexai._utils import tag_response
from ddtrace.llmobs._integrations import VertexAIIntegration
from ddtrace.llmobs._integrations.utils import extract_model_name_google
from ddtrace.trace import Pin
Expand Down Expand Up @@ -69,13 +70,11 @@ def _traced_generate(vertexai, pin, func, instance, args, kwargs, model_instance
# history must be copied since it is modified during the LLM interaction
history = getattr(instance, "history", [])[:]
try:
tag_request(span, integration, instance, args, kwargs, is_chat)
generations = func(*args, **kwargs)
if stream:
return TracedVertexAIStreamResponse(
generations, model_instance, integration, span, args, kwargs, is_chat, history
)
tag_response(span, generations, integration)
except Exception:
span.set_exc_info(*sys.exc_info())
raise
Expand Down Expand Up @@ -104,13 +103,11 @@ async def _traced_agenerate(vertexai, pin, func, instance, args, kwargs, model_i
# history must be copied since it is modified during the LLM interaction
history = getattr(instance, "history", [])[:]
try:
tag_request(span, integration, instance, args, kwargs, is_chat)
generations = await func(*args, **kwargs)
if stream:
return TracedAsyncVertexAIStreamResponse(
generations, model_instance, integration, span, args, kwargs, is_chat, history
)
tag_response(span, generations, integration)
except Exception:
span.set_exc_info(*sys.exc_info())
raise
Expand Down
14 changes: 14 additions & 0 deletions ddtrace/llmobs/_integrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ def get_llmobs_metrics_tags(integration_name, span):
return usage


def parse_llmobs_metric_args(metrics):
usage = {}
input_tokens = _get_attr(metrics, "prompt_tokens", None)
output_tokens = _get_attr(metrics, "completion_tokens", None)
total_tokens = _get_attr(metrics, "total_tokens", None)
if input_tokens is not None:
usage[INPUT_TOKENS_METRIC_KEY] = input_tokens
if output_tokens is not None:
usage[OUTPUT_TOKENS_METRIC_KEY] = output_tokens
if total_tokens is not None:
usage[TOTAL_TOKENS_METRIC_KEY] = total_tokens
return usage


def get_system_instructions_from_google_model(model_instance):
"""
Extract system instructions from model and convert to []str for tagging.
Expand Down
39 changes: 37 additions & 2 deletions ddtrace/llmobs/_integrations/vertexai.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
from ddtrace.internal.utils import ArgumentError
from ddtrace.internal.utils import get_argument_value
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import METADATA
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import extract_message_from_part_google
from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags
from ddtrace.llmobs._integrations.utils import get_system_instructions_from_google_model
from ddtrace.llmobs._integrations.utils import llmobs_get_metadata_google
from ddtrace.llmobs._utils import _get_attr
Expand Down Expand Up @@ -43,6 +45,7 @@ def _llmobs_set_tags(
) -> None:
instance = kwargs.get("instance", None)
history = kwargs.get("history", [])
metrics = kwargs.get("metrics", {})
metadata = llmobs_get_metadata_google(kwargs, instance)

system_instruction = get_system_instructions_from_google_model(instance)
Expand All @@ -56,6 +59,7 @@ def _llmobs_set_tags(
output_messages = [{"content": ""}]
if response is not None:
output_messages = self._extract_output_message(response)
metrics = self._extract_metrics_from_response(response)

span._set_ctx_items(
{
Expand All @@ -65,10 +69,41 @@ def _llmobs_set_tags(
METADATA: metadata,
INPUT_MESSAGES: input_messages,
OUTPUT_MESSAGES: output_messages,
METRICS: get_llmobs_metrics_tags("vertexai", span),
METRICS: metrics,
}
)

def _extract_metrics_from_response(self, response):
"""Extract metrics from the response."""
if isinstance(response, list):
for chunk in response:
token_counts = _get_attr(chunk, "usage_metadata", None)
if not token_counts:
continue
input_tokens = _get_attr(token_counts, "prompt_token_count", 0)
output_tokens = _get_attr(token_counts, "candidates_token_count", 0)
total_tokens = _get_attr(token_counts, "total_token_count", 0)
else:
generations_dict = response.to_dict()

token_counts = generations_dict.get("usage_metadata", None)
if not token_counts:
return

input_tokens = _get_attr(token_counts, "prompt_token_count", 0)
output_tokens = _get_attr(token_counts, "candidates_token_count", 0)
total_tokens = _get_attr(token_counts, "total_token_count", 0)

metrics = {}
if input_tokens is not None:
metrics[INPUT_TOKENS_METRIC_KEY] = input_tokens
if output_tokens is not None:
metrics[OUTPUT_TOKENS_METRIC_KEY] = output_tokens
if total_tokens is not None:
metrics[TOTAL_TOKENS_METRIC_KEY] = total_tokens

return metrics

def _extract_input_message(self, contents, history, system_instruction=None):
from vertexai.generative_models._generative_models import Part

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,19 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.p.tid": "67378d4d00000000",
"_dd.p.tid": "685ae96d00000000",
"language": "python",
"runtime-id": "4bb0a91fcf15428fa3998e8daa98b1dc",
"vertexai.request.contents.0.text": "Why do bears hibernate?",
"vertexai.request.generation_config.max_output_tokens": "30",
"vertexai.request.generation_config.stop_sequences": "['x']",
"vertexai.request.generation_config.temperature": "1.0",
"runtime-id": "51dd90093585456fa5287bb1606924df",
"vertexai.request.model": "gemini-1.5-flash",
"vertexai.request.provider": "google",
"vertexai.response.candidates.0.content.parts.0.text": "Bears hibernate to conserve energy and survive during winter months when food is scarce.\\n",
"vertexai.response.candidates.0.content.role": "model",
"vertexai.response.candidates.0.finish_reason": "STOP"
"vertexai.request.provider": "google"
},
"metrics": {
"_dd.measured": 1,
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"process_id": 87069,
"vertexai.response.usage.completion_tokens": 16,
"vertexai.response.usage.prompt_tokens": 14,
"vertexai.response.usage.total_tokens": 30
"process_id": 93922
},
"duration": 338000,
"start": 1731693901811611000
"duration": 172000,
"start": 1750788461546503000
}]]
Loading
Loading