Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 155 additions & 60 deletions pydantic_ai_slim/pydantic_ai/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import base64
import hashlib
from abc import ABC, abstractmethod
import warnings
from abc import ABC
from collections.abc import Callable, Sequence
from dataclasses import KW_ONLY, dataclass, field, replace
from datetime import datetime
from mimetypes import guess_type
from mimetypes import guess_extension, guess_type
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Literal, TypeAlias, cast, overload
Expand Down Expand Up @@ -181,16 +182,22 @@ def identifier(self) -> str:
"""
return self._identifier or _multi_modal_content_identifier(self.url)

@abstractmethod
def _infer_media_type(self) -> str:
"""Infer the media type of the file based on the URL."""
raise NotImplementedError
type = guess_type(self.url)[0]
if type is None:
raise ValueError(
f'Could not infer media type from URL: {self.url}. Explicitly provide a `media_type` instead.'
)
return type

@property
@abstractmethod
def format(self) -> str:
"""The file format."""
raise NotImplementedError
ext = guess_extension(self.media_type)
if ext is None:
raise ValueError(f'Could not infer file format from media type: {self.media_type}')
return ext[1:] # Strip the leading dot

__repr__ = _utils.dataclasses_no_defaults_repr

Expand Down Expand Up @@ -229,7 +236,7 @@ def __init__(
)
self.kind = kind

def _infer_media_type(self) -> VideoMediaType:
def _infer_media_type(self) -> VideoMediaType | str:
"""Return the media type of the video, based on the url."""
if self.url.endswith('.mkv'):
return 'video/x-matroska'
Expand All @@ -253,22 +260,23 @@ def _infer_media_type(self) -> VideoMediaType:
elif self.is_youtube:
return 'video/mp4'
else:
raise ValueError(
f'Could not infer media type from video URL: {self.url}. Explicitly provide a `media_type` instead.'
)
return super()._infer_media_type()

@property
def is_youtube(self) -> bool:
"""True if the URL has a YouTube domain."""
return self.url.startswith(('https://youtu.be/', 'https://youtube.com/', 'https://www.youtube.com/'))

@property
def format(self) -> VideoFormat:
def format(self) -> VideoFormat | str:
"""The file format of the video.

The choice of supported formats were based on the Bedrock Converse API. Other APIs don't require to use a format.
"""
return _video_format_lookup[self.media_type]
if self.media_type in _video_format_lookup:
return _video_format_lookup[self.media_type]
else:
return super().format


@dataclass(init=False, repr=False)
Expand Down Expand Up @@ -305,7 +313,7 @@ def __init__(
)
self.kind = kind

def _infer_media_type(self) -> AudioMediaType:
def _infer_media_type(self) -> AudioMediaType | str:
"""Return the media type of the audio file, based on the url.

References:
Expand All @@ -324,14 +332,15 @@ def _infer_media_type(self) -> AudioMediaType:
if self.url.endswith('.aac'):
return 'audio/aac'

raise ValueError(
f'Could not infer media type from audio URL: {self.url}. Explicitly provide a `media_type` instead.'
)
return super()._infer_media_type()

@property
def format(self) -> AudioFormat:
def format(self) -> AudioFormat | str:
"""The file format of the audio file."""
return _audio_format_lookup[self.media_type]
if self.media_type in _audio_format_lookup:
return _audio_format_lookup[self.media_type]
else:
return super().format


@dataclass(init=False, repr=False)
Expand Down Expand Up @@ -368,7 +377,7 @@ def __init__(
)
self.kind = kind

def _infer_media_type(self) -> ImageMediaType:
def _infer_media_type(self) -> ImageMediaType | str:
"""Return the media type of the image, based on the url."""
if self.url.endswith(('.jpg', '.jpeg')):
return 'image/jpeg'
Expand All @@ -378,18 +387,18 @@ def _infer_media_type(self) -> ImageMediaType:
return 'image/gif'
elif self.url.endswith('.webp'):
return 'image/webp'
else:
raise ValueError(
f'Could not infer media type from image URL: {self.url}. Explicitly provide a `media_type` instead.'
)
return super()._infer_media_type()

@property
def format(self) -> ImageFormat:
def format(self) -> ImageFormat | str:
"""The file format of the image.

The choice of supported formats were based on the Bedrock Converse API. Other APIs don't require to use a format.
"""
return _image_format_lookup[self.media_type]
if self.media_type in _image_format_lookup:
return _image_format_lookup[self.media_type]
else:
return super().format


@dataclass(init=False, repr=False)
Expand Down Expand Up @@ -448,25 +457,19 @@ def _infer_media_type(self) -> str:
return 'application/vnd.ms-excel'
elif self.url.endswith('.xlsx'):
return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'

type_, _ = guess_type(self.url)
if type_ is None:
raise ValueError(
f'Could not infer media type from document URL: {self.url}. Explicitly provide a `media_type` instead.'
)
return type_
else:
return super()._infer_media_type()

@property
def format(self) -> DocumentFormat:
def format(self) -> DocumentFormat | str:
"""The file format of the document.

The choice of supported formats were based on the Bedrock Converse API. Other APIs don't require to use a format.
"""
media_type = self.media_type
try:
return _document_format_lookup[media_type]
except KeyError as e:
raise ValueError(f'Unknown document media type: {media_type}') from e
if self.media_type in _document_format_lookup:
return _document_format_lookup[self.media_type]
else:
return super().format


@dataclass(init=False, repr=False)
Expand All @@ -481,7 +484,6 @@ class BinaryContent:

_: KW_ONLY

media_type: AudioMediaType | ImageMediaType | DocumentMediaType | str
"""The media type of the binary data."""

vendor_metadata: dict[str, Any] | None = None
Expand All @@ -496,22 +498,36 @@ class BinaryContent:
compare=False, default=None
)

_media_type: Annotated[str | None, pydantic.Field(alias='media_type', default=None, exclude=True)] = field(
compare=False, default=None
)
_type: Annotated[str | None, pydantic.Field(alias='type', default=None, exclude=True)] = field(
compare=False, default=None
)
_extension: Annotated[str | None, pydantic.Field(alias='extension', default=None, exclude=True)] = field(
compare=False, default=None
)
kind: Literal['binary'] = 'binary'
"""Type identifier, this is available on all parts as a discriminator."""

def __init__(
self,
data: bytes,
*,
media_type: AudioMediaType | ImageMediaType | DocumentMediaType | str,
media_type: str | None = None,
identifier: str | None = None,
vendor_metadata: dict[str, Any] | None = None,
kind: Literal['binary'] = 'binary',
# Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
_identifier: str | None = None,
_media_type: str | None = None,
_type: str | None = None,
_extension: str | None = None,
) -> None:
self.data = data
self.media_type = media_type
self._media_type = media_type or _media_type
self._type = _type
self._extension = _extension
self._identifier = identifier or _identifier
self.vendor_metadata = vendor_metadata
self.kind = kind
Expand Down Expand Up @@ -574,6 +590,56 @@ def identifier(self) -> str:
"""
return self._identifier or _multi_modal_content_identifier(self.data)

@pydantic.computed_field
@property
def media_type(self) -> str:
"""The media type of the binary content.

Automatically detects the media type using:
1. Magika (preferred, more accurate for documents)
2. python-magic (fallback)

Raises:
ImportError: If neither magika nor python-magic is installed.
"""
return self._media_type or self._infer_media_type()

def _infer_media_type(self) -> str:
"""Infer the media type of the binary content.

This method is deprecated. Use `self.media_type` property instead.
"""
try:
# Try Magika first (more accurate, especially for documents)
from magika import Magika

magika = Magika()
result = magika.identify_bytes(self.data).output
self._media_type = result.mime_type
self._type = result.group
self._extension = result.extensions[0] if result.extensions else None
return self._media_type
except ImportError:
pass

# Fallback to python-magic
try:
from magic import Magic

magic = Magic(mime=True)
self._media_type = magic.from_buffer(self.data)
warnings.warn(
'Using magic to identify media_type may result in incorrect identification of some document types. '
'To improve identification, please install the "magika" package.',
category=UserWarning,
stacklevel=2,
)
return self._media_type
except ImportError as e:
raise ImportError(
'To use BinaryContent without providing media_type, please install the "python-magic" or "magika" package.'
) from e

@property
def data_uri(self) -> str:
"""Convert the `BinaryContent` to a data URI."""
Expand All @@ -587,37 +653,51 @@ def base64(self) -> str:
@property
def is_audio(self) -> bool:
"""Return `True` if the media type is an audio type."""
return self.media_type.startswith('audio/')
return self._type == 'audio' or self.media_type.startswith('audio/')

@property
def is_image(self) -> bool:
"""Return `True` if the media type is an image type."""
return self.media_type.startswith('image/')
return self._type == 'image' or self.media_type.startswith('image/')

@property
def is_video(self) -> bool:
"""Return `True` if the media type is a video type."""
return self.media_type.startswith('video/')
return self._type == 'video' or self.media_type.startswith('video/')

@property
def is_document(self) -> bool:
"""Return `True` if the media type is a document type."""
return self.media_type in _document_format_lookup
return self._type == 'document' or self.media_type in _document_format_lookup

@property
def format(self) -> str:
"""The file format of the binary content."""
try:
if self.is_audio:
return _audio_format_lookup[self.media_type]
elif self.is_image:
return _image_format_lookup[self.media_type]
elif self.is_video:
return _video_format_lookup[self.media_type]
else:
return _document_format_lookup[self.media_type]
except KeyError as e:
raise ValueError(f'Unknown media type: {self.media_type}') from e
"""The file format of the binary content.

Returns the file extension (without leading dot) based on the media type.
Uses cached extension from Magika if available, otherwise maps from media_type.

Raises:
ValueError: If file format cannot be inferred from media type.
"""
if self._extension is not None:
return self._extension

# Combine all format lookups
type_map = _image_format_lookup | _audio_format_lookup | _document_format_lookup | _video_format_lookup

if self.media_type in type_map:
self._extension = type_map[self.media_type]
return self._extension

# Fallback to mimetypes.guess_extension
ext = guess_extension(self.media_type)
if ext is None:
raise ValueError(f'Unknown media type: {self.media_type}')

# Cache and return (strip the leading dot)
self._extension = ext[1:]
return self._extension

__repr__ = _utils.dataclasses_no_defaults_repr

Expand All @@ -629,15 +709,27 @@ def __init__(
self,
data: bytes,
*,
media_type: str,
media_type: str | None = None,
identifier: str | None = None,
vendor_metadata: dict[str, Any] | None = None,
# Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
kind: Literal['binary'] = 'binary',
_identifier: str | None = None,
_media_type: str | None = None,
_type: str | None = None,
_extension: str | None = None,
):
# Use _media_type if media_type is not provided (for inline-snapshot compatibility)
effective_media_type = media_type or _media_type

super().__init__(
data=data, media_type=media_type, identifier=identifier or _identifier, vendor_metadata=vendor_metadata
data=data,
media_type=effective_media_type,
identifier=identifier or _identifier,
vendor_metadata=vendor_metadata,
_media_type=_media_type,
_type=_type,
_extension=_extension,
)

if not self.is_image:
Expand Down Expand Up @@ -1396,7 +1488,10 @@ def new_event_body():
elif isinstance(part, TextPart | ThinkingPart):
kind = part.part_kind
body.setdefault('content', []).append(
{'kind': kind, **({'text': part.content} if settings.include_content else {})}
{
'kind': kind,
**({'text': part.content} if settings.include_content else {}),
}
)
elif isinstance(part, FilePart):
body.setdefault('content', []).append(
Expand Down
Loading
Loading