diff --git a/examples/rpc.py b/examples/rpc.py index 005701fa..6aee34ae 100644 --- a/examples/rpc.py +++ b/examples/rpc.py @@ -85,7 +85,7 @@ async def main(): def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room): - @greeters_room.local_participant.register_rpc_method("arrival") + @greeters_room.register_rpc_method("arrival") async def arrival_method( data: RpcInvocationData, ): @@ -93,7 +93,7 @@ async def arrival_method( await asyncio.sleep(2) return "Welcome and have a wonderful day!" - @math_genius_room.local_participant.register_rpc_method("square-root") + @math_genius_room.register_rpc_method("square-root") async def square_root_method( data: RpcInvocationData, ): @@ -110,7 +110,7 @@ async def square_root_method( print(f"[Math Genius] Aha! It's {result}") return json.dumps({"result": result}) - @math_genius_room.local_participant.register_rpc_method("divide") + @math_genius_room.register_rpc_method("divide") async def divide_method( data: RpcInvocationData, ): @@ -122,7 +122,7 @@ async def divide_method( result = dividend / divisor return json.dumps({"result": result}) - @math_genius_room.local_participant.register_rpc_method("long-calculation") + @math_genius_room.register_rpc_method("long-calculation") async def long_calculation_method( data: RpcInvocationData, ): diff --git a/livekit-rtc/livekit/rtc/_proto/rpc_pb2.py b/livekit-rtc/livekit/rtc/_proto/rpc_pb2.py index eeb1f9ff..14fc4e24 100644 --- a/livekit-rtc/livekit/rtc/_proto/rpc_pb2.py +++ b/livekit-rtc/livekit/rtc/_proto/rpc_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\rlivekit.proto\"7\n\x08RpcError\x12\x0c\n\x04\x63ode\x18\x01 \x02(\r\x12\x0f\n\x07message\x18\x02 \x02(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\t\"\x91\x01\n\x11PerformRpcRequest\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x1c\n\x14\x64\x65stination_identity\x18\x02 \x02(\t\x12\x0e\n\x06method\x18\x03 \x02(\t\x12\x0f\n\x07payload\x18\x04 \x02(\t\x12\x1b\n\x13response_timeout_ms\x18\x05 \x01(\r\"L\n\x18RegisterRpcMethodRequest\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x0e\n\x06method\x18\x02 \x02(\t\"N\n\x1aUnregisterRpcMethodRequest\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x0e\n\x06method\x18\x02 \x02(\t\"\x96\x01\n\"RpcMethodInvocationResponseRequest\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x15\n\rinvocation_id\x18\x02 \x02(\x04\x12\x0f\n\x07payload\x18\x03 \x01(\t\x12&\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x17.livekit.proto.RpcError\"&\n\x12PerformRpcResponse\x12\x10\n\x08\x61sync_id\x18\x01 \x02(\x04\"\x1b\n\x19RegisterRpcMethodResponse\"\x1d\n\x1bUnregisterRpcMethodResponse\"4\n#RpcMethodInvocationResponseResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t\"_\n\x12PerformRpcCallback\x12\x10\n\x08\x61sync_id\x18\x01 \x02(\x04\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12&\n\x05\x65rror\x18\x03 \x01(\x0b\x32\x17.livekit.proto.RpcError\"\xbe\x01\n\x18RpcMethodInvocationEvent\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x15\n\rinvocation_id\x18\x02 \x02(\x04\x12\x0e\n\x06method\x18\x03 \x02(\t\x12\x12\n\nrequest_id\x18\x04 \x02(\t\x12\x17\n\x0f\x63\x61ller_identity\x18\x05 \x02(\t\x12\x0f\n\x07payload\x18\x06 \x02(\t\x12\x1b\n\x13response_timeout_ms\x18\x07 \x02(\rB\x10\xaa\x02\rLiveKit.Proto') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\trpc.proto\x12\rlivekit.proto\"7\n\x08RpcError\x12\x0c\n\x04\x63ode\x18\x01 \x02(\r\x12\x0f\n\x07message\x18\x02 \x02(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\t\"\x91\x01\n\x11PerformRpcRequest\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x1c\n\x14\x64\x65stination_identity\x18\x02 \x02(\t\x12\x0e\n\x06method\x18\x03 \x02(\t\x12\x0f\n\x07payload\x18\x04 \x02(\t\x12\x1b\n\x13response_timeout_ms\x18\x05 \x01(\r\"?\n\x18RegisterRpcMethodRequest\x12\x13\n\x0broom_handle\x18\x01 \x02(\x04\x12\x0e\n\x06method\x18\x02 \x02(\t\"A\n\x1aUnregisterRpcMethodRequest\x12\x13\n\x0broom_handle\x18\x01 \x02(\x04\x12\x0e\n\x06method\x18\x02 \x02(\t\"\x96\x01\n\"RpcMethodInvocationResponseRequest\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x15\n\rinvocation_id\x18\x02 \x02(\x04\x12\x0f\n\x07payload\x18\x03 \x01(\t\x12&\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x17.livekit.proto.RpcError\"&\n\x12PerformRpcResponse\x12\x10\n\x08\x61sync_id\x18\x01 \x02(\x04\"\x1b\n\x19RegisterRpcMethodResponse\"\x1d\n\x1bUnregisterRpcMethodResponse\"4\n#RpcMethodInvocationResponseResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t\"_\n\x12PerformRpcCallback\x12\x10\n\x08\x61sync_id\x18\x01 \x02(\x04\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12&\n\x05\x65rror\x18\x03 \x01(\x0b\x32\x17.livekit.proto.RpcError\"\xbe\x01\n\x18RpcMethodInvocationEvent\x12 \n\x18local_participant_handle\x18\x01 \x02(\x04\x12\x15\n\rinvocation_id\x18\x02 \x02(\x04\x12\x0e\n\x06method\x18\x03 \x02(\t\x12\x12\n\nrequest_id\x18\x04 \x02(\t\x12\x17\n\x0f\x63\x61ller_identity\x18\x05 \x02(\t\x12\x0f\n\x07payload\x18\x06 \x02(\t\x12\x1b\n\x13response_timeout_ms\x18\x07 \x02(\rB\x10\xaa\x02\rLiveKit.Proto') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -27,21 +27,21 @@ _globals['_PERFORMRPCREQUEST']._serialized_start=86 _globals['_PERFORMRPCREQUEST']._serialized_end=231 _globals['_REGISTERRPCMETHODREQUEST']._serialized_start=233 - _globals['_REGISTERRPCMETHODREQUEST']._serialized_end=309 - _globals['_UNREGISTERRPCMETHODREQUEST']._serialized_start=311 - _globals['_UNREGISTERRPCMETHODREQUEST']._serialized_end=389 - _globals['_RPCMETHODINVOCATIONRESPONSEREQUEST']._serialized_start=392 - _globals['_RPCMETHODINVOCATIONRESPONSEREQUEST']._serialized_end=542 - _globals['_PERFORMRPCRESPONSE']._serialized_start=544 - _globals['_PERFORMRPCRESPONSE']._serialized_end=582 - _globals['_REGISTERRPCMETHODRESPONSE']._serialized_start=584 - _globals['_REGISTERRPCMETHODRESPONSE']._serialized_end=611 - _globals['_UNREGISTERRPCMETHODRESPONSE']._serialized_start=613 - _globals['_UNREGISTERRPCMETHODRESPONSE']._serialized_end=642 - _globals['_RPCMETHODINVOCATIONRESPONSERESPONSE']._serialized_start=644 - _globals['_RPCMETHODINVOCATIONRESPONSERESPONSE']._serialized_end=696 - _globals['_PERFORMRPCCALLBACK']._serialized_start=698 - _globals['_PERFORMRPCCALLBACK']._serialized_end=793 - _globals['_RPCMETHODINVOCATIONEVENT']._serialized_start=796 - _globals['_RPCMETHODINVOCATIONEVENT']._serialized_end=986 + _globals['_REGISTERRPCMETHODREQUEST']._serialized_end=296 + _globals['_UNREGISTERRPCMETHODREQUEST']._serialized_start=298 + _globals['_UNREGISTERRPCMETHODREQUEST']._serialized_end=363 + _globals['_RPCMETHODINVOCATIONRESPONSEREQUEST']._serialized_start=366 + _globals['_RPCMETHODINVOCATIONRESPONSEREQUEST']._serialized_end=516 + _globals['_PERFORMRPCRESPONSE']._serialized_start=518 + _globals['_PERFORMRPCRESPONSE']._serialized_end=556 + _globals['_REGISTERRPCMETHODRESPONSE']._serialized_start=558 + _globals['_REGISTERRPCMETHODRESPONSE']._serialized_end=585 + _globals['_UNREGISTERRPCMETHODRESPONSE']._serialized_start=587 + _globals['_UNREGISTERRPCMETHODRESPONSE']._serialized_end=616 + _globals['_RPCMETHODINVOCATIONRESPONSERESPONSE']._serialized_start=618 + _globals['_RPCMETHODINVOCATIONRESPONSERESPONSE']._serialized_end=670 + _globals['_PERFORMRPCCALLBACK']._serialized_start=672 + _globals['_PERFORMRPCCALLBACK']._serialized_end=767 + _globals['_RPCMETHODINVOCATIONEVENT']._serialized_start=770 + _globals['_RPCMETHODINVOCATIONEVENT']._serialized_end=960 # @@protoc_insertion_point(module_scope) diff --git a/livekit-rtc/livekit/rtc/_proto/rpc_pb2.pyi b/livekit-rtc/livekit/rtc/_proto/rpc_pb2.pyi index 20330912..9d426942 100644 --- a/livekit-rtc/livekit/rtc/_proto/rpc_pb2.pyi +++ b/livekit-rtc/livekit/rtc/_proto/rpc_pb2.pyi @@ -79,18 +79,18 @@ global___PerformRpcRequest = PerformRpcRequest class RegisterRpcMethodRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - LOCAL_PARTICIPANT_HANDLE_FIELD_NUMBER: builtins.int + ROOM_HANDLE_FIELD_NUMBER: builtins.int METHOD_FIELD_NUMBER: builtins.int - local_participant_handle: builtins.int + room_handle: builtins.int method: builtins.str def __init__( self, *, - local_participant_handle: builtins.int | None = ..., + room_handle: builtins.int | None = ..., method: builtins.str | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["local_participant_handle", b"local_participant_handle", "method", b"method"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["local_participant_handle", b"local_participant_handle", "method", b"method"]) -> None: ... + def HasField(self, field_name: typing.Literal["method", b"method", "room_handle", b"room_handle"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["method", b"method", "room_handle", b"room_handle"]) -> None: ... global___RegisterRpcMethodRequest = RegisterRpcMethodRequest @@ -98,18 +98,18 @@ global___RegisterRpcMethodRequest = RegisterRpcMethodRequest class UnregisterRpcMethodRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - LOCAL_PARTICIPANT_HANDLE_FIELD_NUMBER: builtins.int + ROOM_HANDLE_FIELD_NUMBER: builtins.int METHOD_FIELD_NUMBER: builtins.int - local_participant_handle: builtins.int + room_handle: builtins.int method: builtins.str def __init__( self, *, - local_participant_handle: builtins.int | None = ..., + room_handle: builtins.int | None = ..., method: builtins.str | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["local_participant_handle", b"local_participant_handle", "method", b"method"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["local_participant_handle", b"local_participant_handle", "method", b"method"]) -> None: ... + def HasField(self, field_name: typing.Literal["method", b"method", "room_handle", b"room_handle"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["method", b"method", "room_handle", b"room_handle"]) -> None: ... global___UnregisterRpcMethodRequest = UnregisterRpcMethodRequest diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index cee74375..e2e4519d 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -15,12 +15,13 @@ from __future__ import annotations import ctypes -import asyncio import os import mimetypes import aiofiles -from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast +import weakref +from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, TYPE_CHECKING from abc import abstractmethod, ABC +from deprecated import deprecated from ._ffi_client import FfiClient, FfiHandle from ._proto import ffi_pb2 as proto_ffi @@ -43,8 +44,6 @@ ) from .transcription import Transcription from .rpc import RpcError -from ._proto.rpc_pb2 import RpcMethodInvocationResponseRequest -from .log import logger from .rpc import RpcInvocationData from .data_stream import ( @@ -54,6 +53,9 @@ STREAM_CHUNK_SIZE, ) +if TYPE_CHECKING: + from .room import Room + class PublishTrackError(Exception): def __init__(self, message: str) -> None: @@ -151,13 +153,16 @@ def __init__( self, room_queue: BroadcastQueue[proto_ffi.FfiEvent], owned_info: proto_participant.OwnedParticipant, + room: Room, ) -> None: super().__init__(owned_info) self._room_queue = room_queue self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore - self._rpc_handlers: Dict[ - str, Callable[[RpcInvocationData], Union[Awaitable[str], str]] - ] = {} + self._room_ref = weakref.ref(room) + + @property + def room(self) -> Room | None: + return self._room_ref() @property def track_publications(self) -> Mapping[str, LocalTrackPublication]: @@ -325,12 +330,15 @@ async def perform_rpc( return cb.perform_rpc.payload + @deprecated(reason="Use room.register_rpc_method instead.") def register_rpc_method( self, method_name: str, handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None, ) -> Union[None, Callable]: """ + Deprecated + Establishes the participant as a receiver for calls of the specified RPC method. Can be used either as a decorator or a regular method. @@ -365,35 +373,24 @@ async def greet_handler(data: RpcInvocationData) -> str: room.local_participant.register_rpc_method('greet', greet_handler) """ + room = self.room + if room is not None: + return room.register_rpc_method(method_name, handler) + return None - def register(handler_func): - self._rpc_handlers[method_name] = handler_func - req = proto_ffi.FfiRequest() - req.register_rpc_method.local_participant_handle = self._ffi_handle.handle - req.register_rpc_method.method = method_name - FfiClient.instance.request(req) - - if handler is not None: - register(handler) - return None - else: - # Called as a decorator - return register - + @deprecated(reason="Use room.unregister_rpc_method instead.") def unregister_rpc_method(self, method: str) -> None: """ + Deprecated + Unregisters a previously registered RPC method. Args: method (str): The name of the RPC method to unregister """ - self._rpc_handlers.pop(method, None) - - req = proto_ffi.FfiRequest() - req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle - req.unregister_rpc_method.method = method - - FfiClient.instance.request(req) + room = self.room + if room is not None: + room.unregister_rpc_method(method) def set_track_subscription_permissions( self, @@ -417,72 +414,6 @@ def set_track_subscription_permissions( req.set_track_subscription_permissions.permissions.extend(participant_permissions) FfiClient.instance.request(req) - async def _handle_rpc_method_invocation( - self, - invocation_id: int, - method: str, - request_id: str, - caller_identity: str, - payload: str, - response_timeout: float, - ) -> None: - response_error: Optional[RpcError] = None - response_payload: Optional[str] = None - - params = RpcInvocationData(request_id, caller_identity, payload, response_timeout) - - handler = self._rpc_handlers.get(method) - - if not handler: - response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD) - else: - try: - if asyncio.iscoroutinefunction(handler): - async_handler = cast(Callable[[RpcInvocationData], Awaitable[str]], handler) - - async def run_handler(): - try: - return await async_handler(params) - except asyncio.CancelledError: - # This will be caught by the outer try-except if it's due to timeout - raise - - try: - response_payload = await asyncio.wait_for( - run_handler(), timeout=response_timeout - ) - except asyncio.TimeoutError: - raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT) - except asyncio.CancelledError: - raise RpcError._built_in(RpcError.ErrorCode.RECIPIENT_DISCONNECTED) - else: - sync_handler = cast(Callable[[RpcInvocationData], str], handler) - response_payload = sync_handler(params) - except RpcError as error: - response_error = error - except Exception as error: - logger.exception( - f"Uncaught error returned by RPC handler for {method}. " - "Returning APPLICATION_ERROR instead. " - f"Original error: {error}" - ) - response_error = RpcError._built_in(RpcError.ErrorCode.APPLICATION_ERROR) - - req = proto_ffi.FfiRequest( - rpc_method_invocation_response=RpcMethodInvocationResponseRequest( - local_participant_handle=self._ffi_handle.handle, - invocation_id=invocation_id, - error=response_error._to_proto() if response_error else None, - payload=response_payload, - ) - ) - - res = FfiClient.instance.request(req) - - if res.rpc_method_invocation_response.error: - message = res.rpc_method_invocation_response.error - logger.exception(f"error sending rpc method invocation response: {message}") - async def set_metadata(self, metadata: str) -> None: """ Set the metadata for the local participant. diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 1a6cf622..81382d12 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -17,7 +17,7 @@ import ctypes import logging from dataclasses import dataclass, field -from typing import Callable, Dict, Literal, Optional, cast, Mapping +from typing import Callable, Dict, Literal, Optional, cast, Mapping, Union, Awaitable from .event_emitter import EventEmitter from ._ffi_client import FfiClient, FfiHandle @@ -26,10 +26,12 @@ from ._proto import room_pb2 as proto_room from ._proto.room_pb2 import ConnectionState from ._proto.track_pb2 import TrackKind -from ._proto.rpc_pb2 import RpcMethodInvocationEvent +from ._proto.rpc_pb2 import RpcMethodInvocationEvent, RpcMethodInvocationResponseRequest + from ._utils import BroadcastQueue from .e2ee import E2EEManager, E2EEOptions from .participant import LocalParticipant, Participant, RemoteParticipant +from .rpc import RpcInvocationData, RpcError from .track import RemoteAudioTrack, RemoteVideoTrack from .track_publication import RemoteTrackPublication, TrackPublication from .transcription import TranscriptionSegment @@ -39,6 +41,7 @@ TextStreamHandler, ByteStreamHandler, ) +from .log import logger EventTypes = Literal[ @@ -141,6 +144,9 @@ def __init__( self._loop = loop or asyncio.get_event_loop() self._room_queue = BroadcastQueue[proto_ffi.FfiEvent]() self._info = proto_room.RoomInfo() + self._rpc_handlers: Dict[ + str, Callable[[RpcInvocationData], Union[Awaitable[str], str]] + ] = {} self._rpc_invocation_tasks: set[asyncio.Task] = set() self._data_stream_tasks: set[asyncio.Task] = set() @@ -394,7 +400,7 @@ def on_participant_connected(participant): self._connection_state = ConnectionState.CONN_CONNECTED self._local_participant = LocalParticipant( - self._room_queue, cb.connect.result.local_participant + self._room_queue, cb.connect.result.local_participant, self ) for pt in cb.connect.result.participants: @@ -449,6 +455,81 @@ async def disconnect(self) -> None: await self._task FfiClient.instance.queue.unsubscribe(self._ffi_queue) + def register_rpc_method( + self, + method_name: str, + handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None, + ) -> Union[None, Callable]: + """ + Establishes the participant as a receiver for calls of the specified RPC method. + Can be used either as a decorator or a regular method. + + The handler will receive one argument of type `RpcInvocationData` and should return a string response which will be forwarded back to the caller. + + The handler may be synchronous or asynchronous. + + If unable to respond within `response_timeout`, the caller will hang up and receive an error on their side. + + You may raise errors of type `RpcError` in the handler, and they will be forwarded to the caller. + + Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error". + + Args: + method_name (str): The name of the indicated RPC method. + handler (Optional[Callable]): Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax. + + Returns: + None (when used as a decorator it returns the decorator function) + + Example: + # As a decorator: + @room.register_rpc_method("greet") + async def greet_handler(data: RpcInvocationData) -> str: + print(f"Received greeting from {data.caller_identity}: {data.payload}") + return f"Hello, {data.caller_identity}!" + + # As a regular method: + async def greet_handler(data: RpcInvocationData) -> str: + print(f"Received greeting from {data.caller_identity}: {data.payload}") + return f"Hello, {data.caller_identity}!" + + room.register_rpc_method('greet', greet_handler) + """ + + def register(handler_func): + if self._ffi_handle is None: + raise Exception("cannot register RPC method before room is connected") + self._rpc_handlers[method_name] = handler_func + req = proto_ffi.FfiRequest() + req.register_rpc_method.room_handle = self._ffi_handle.handle + req.register_rpc_method.method = method_name + FfiClient.instance.request(req) + + if handler is not None: + register(handler) + return None + else: + # Called as a decorator + return register + + def unregister_rpc_method(self, method: str) -> None: + """ + Unregisters a previously registered RPC method. + + Args: + method (str): The name of the RPC method to unregister + """ + if self._ffi_handle is None: + raise Exception("cannot unregister RPC method before room is connected") + + self._rpc_handlers.pop(method, None) + + req = proto_ffi.FfiRequest() + req.unregister_rpc_method.room_handle = self._ffi_handle.handle + req.unregister_rpc_method.method = method + + FfiClient.instance.request(req) + async def _listen_task(self) -> None: # listen to incoming room events while True: @@ -483,7 +564,7 @@ def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): if rpc_invocation.local_participant_handle == self._local_participant._ffi_handle.handle: task = self._loop.create_task( - self._local_participant._handle_rpc_method_invocation( + self._handle_rpc_method_invocation( rpc_invocation.invocation_id, rpc_invocation.method, rpc_invocation.request_id, @@ -823,6 +904,74 @@ def _create_remote_participant( self._remote_participants[participant.identity] = participant return participant + async def _handle_rpc_method_invocation( + self, + invocation_id: int, + method: str, + request_id: str, + caller_identity: str, + payload: str, + response_timeout: float, + ) -> None: + response_error: Optional[RpcError] = None + response_payload: Optional[str] = None + + params = RpcInvocationData(request_id, caller_identity, payload, response_timeout) + + handler = self._rpc_handlers.get(method) + + if not handler: + response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD) + else: + try: + if asyncio.iscoroutinefunction(handler): + async_handler = cast(Callable[[RpcInvocationData], Awaitable[str]], handler) + + async def run_handler(): + try: + return await async_handler(params) + except asyncio.CancelledError: + # This will be caught by the outer try-except if it's due to timeout + raise + + try: + response_payload = await asyncio.wait_for( + run_handler(), timeout=response_timeout + ) + except asyncio.TimeoutError: + raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT) + except asyncio.CancelledError: + raise RpcError._built_in(RpcError.ErrorCode.RECIPIENT_DISCONNECTED) + else: + sync_handler = cast(Callable[[RpcInvocationData], str], handler) + response_payload = sync_handler(params) + except RpcError as error: + response_error = error + except Exception as error: + logger.exception( + f"Uncaught error returned by RPC handler for {method}. " + "Returning APPLICATION_ERROR instead. " + f"Original error: {error}" + ) + response_error = RpcError._built_in(RpcError.ErrorCode.APPLICATION_ERROR) + + # _local_participant is guaranteed to be set after connect() + assert self._local_participant is not None + req = proto_ffi.FfiRequest( + rpc_method_invocation_response=RpcMethodInvocationResponseRequest( + local_participant_handle=self._local_participant._ffi_handle.handle, + invocation_id=invocation_id, + error=response_error._to_proto() if response_error else None, + payload=response_payload, + ) + ) + + res = FfiClient.instance.request(req) + + if res.rpc_method_invocation_response.error: + message = res.rpc_method_invocation_response.error + logger.exception(f"error sending rpc method invocation response: {message}") + def __repr__(self) -> str: sid = "unknown" if self._first_sid_future.done(): diff --git a/livekit-rtc/rust-sdks b/livekit-rtc/rust-sdks index 3aeced95..1f3d9a3e 160000 --- a/livekit-rtc/rust-sdks +++ b/livekit-rtc/rust-sdks @@ -1 +1 @@ -Subproject commit 3aeced9522a2b06b1d2e7778a06ed2c8ddfaf76f +Subproject commit 1f3d9a3e5b88daabc3c0a48ebd0fbac8d18578b6 diff --git a/livekit-rtc/setup.py b/livekit-rtc/setup.py index 7cf1508c..3795c79e 100644 --- a/livekit-rtc/setup.py +++ b/livekit-rtc/setup.py @@ -58,7 +58,12 @@ def finalize_options(self): license="Apache-2.0", packages=setuptools.find_namespace_packages(include=["livekit.*"]), python_requires=">=3.9.0", - install_requires=["protobuf>=4.25.0", "types-protobuf>=3", "aiofiles>=24"], + install_requires=[ + "protobuf>=4.25.0", + "types-protobuf>=3", + "aiofiles>=24", + "deprecated>=1.2.18", + ], package_data={ "livekit.rtc": ["_proto/*.py", "py.typed", "*.pyi", "**/*.pyi"], "livekit.rtc.resources": ["*.so", "*.dylib", "*.dll", "LICENSE.md", "*.h"],