diff --git a/examples.livemd b/examples.livemd index 8d4b59e5..e08633a8 100644 --- a/examples.livemd +++ b/examples.livemd @@ -252,8 +252,226 @@ defmodule OpenAIWebSocket do end ``` +```elixir +defmodule ElevenLabsWebSocket do + use WebSockex + require Logger + + def start_link(opts) do + # OpenAI API docs: https://platform.openai.com/docs/guides/realtime + WebSockex.start_link( + "wss://api.elevenlabs.io/v1/convai/conversation?agent_id=#{opts["agent_id"]}", + __MODULE__, + %{response: <<>>} + # extra_headers: [ + # {"Authorization", "Bearer " <> opts[:token]}, + # {"OpenAI-Beta", "realtime=v1"} + # ] + ) + end + + def send_audio(ws, audio) do + audio = Base.encode64(audio) + frame = %{user_audio_chunk: audio} |> Jason.encode!() + WebSockex.send_frame(ws, {:text, frame}) + end + + def get_response_chunk(ws, chunk_byte_size) do + # There's no 'call' in WebSockex, so we just send and receive + send(ws, {:get_response_chunk, chunk_byte_size, self()}) + + receive do + {:response_chunk, chunk} -> chunk + end + end + + @impl true + def handle_frame({:text, frame}, state) do + case Jason.decode!(frame) do + %{"type" => "audio", "audio_event" => audio_event} -> + audio_payload = Base.decode64!(audio_event["audio_base_64"]) + # Buffer the response audio + response = state.response <> audio_payload + {:ok, %{state | response: response}} + + %{"type" => "interruption"} -> + # If the user speaks, they may interrupt the current response, + # so we drop it and wait for a new one. + {:ok, %{state | response: <<>>}} + + %{"type" => "user_transcript", "user_transcription_event" => transcript} -> + Logger.info("AI transcription: #{transcript["user_transcript"]}") + {:ok, state} + + %{} = _event -> + {:ok, state} + end + end + + @impl true + def handle_frame(_frame, state), do: {:ok, state} + + @impl true + def handle_info({:get_response_chunk, size, pid}, state) do + case state.response do + <> -> + # If we have enough data, send it back + send(pid, {:response_chunk, chunk}) + {:ok, %{state | response: rest}} + + chunk -> + # Otherwise, send what we have, padded with silence + silence = <<0::size(size - byte_size(chunk))-unit(8)>> + send(pid, {:response_chunk, chunk <> silence}) + {:ok, %{state | response: <<>>}} + end + end +end +``` + +```elixir +defmodule UltravoxWebSocket do + use WebSockex + require Logger + + def start_link(opts) do + joinUrl = opts[:joinUrl] + + IO.inspect(joinUrl) + + WebSockex.start_link( + joinUrl, + __MODULE__, + %{response: <<>>} + # extra_headers: [ + # {"Authorization", "Bearer " <> opts[:token]}, + # {"OpenAI-Beta", "realtime=v1"} + # ] + ) + end + + def send_audio(ws, audio) do + # audio = Base.encode64(audio) + # frame = %{user_audio_chunk: audio} |> Jason.encode!() + WebSockex.send_frame(ws, {:binary, audio}) + end + + def get_response_chunk(ws, chunk_byte_size) do + # There's no 'call' in WebSockex, so we just send and receive + send(ws, {:get_response_chunk, chunk_byte_size, self()}) + + receive do + {:response_chunk, chunk} -> chunk + end + end + + @impl true + def handle_frame({:text, frame}, state) do + case Jason.decode!(frame) do + %{"type" => "playback_clear_buffer"} -> + # If the user speaks, they may interrupt the current response, + # so we drop it and wait for a new one. + {:ok, %{state | response: <<>>}} + + %{"type" => "transcript", "delta" => delta} -> + Logger.info("AI transcription: #{delta}") + {:ok, state} + + %{} = _event -> + {:ok, state} + end + end + + @impl true + def handle_frame({:binary, frame}, state) do + # Buffer the response audio + response = state.response <> frame + {:ok, %{state | response: response}} + end + + @impl true + def handle_frame(_frame, state), do: {:ok, state} + + @impl true + def handle_info({:get_response_chunk, size, pid}, state) do + case state.response do + <> -> + # If we have enough data, send it back + send(pid, {:response_chunk, chunk}) + {:ok, %{state | response: rest}} + + chunk -> + # Otherwise, send what we have, padded with silence + silence = <<0::size(size - byte_size(chunk))-unit(8)>> + send(pid, {:response_chunk, chunk <> silence}) + {:ok, %{state | response: <<>>}} + end + end + + def handle_info(:close_socket, state) do + Logger.info("Local close socket") + {:close, state} + end + + @impl true + def handle_connect(_conn, state) do + Logger.info("Connected!") + {:ok, state} + end + + @impl true + def handle_disconnect(%{reason: {:local, reason}}, state) do + Logger.info("Local close with reason: #{inspect reason}") + {:ok, state} + end + + @impl true + def handle_disconnect(disconnect_map, state) do + super(disconnect_map, state) + end + + @impl true + def terminate(reason, _state) do + IO.puts("WebSockex terminating with reason: #{inspect reason}") + exit(:normal) + end + +end +``` + +For Ultravox, you will need to first get a joinUrl + +```elixir +resp = + Req.post!("https://api.ultravox.ai/api/calls", + headers: %{"X-API-Key" => "UltraVox_API_KEY", + "Content-Type" => "application/json"}, + json: + %{systemPrompt: "You are a helpful assistant", + model: "fixie-ai/ultravox", + voice: "Eric", + medium: %{ + serverWebSocket: %{ + inputSampleRate: 16000, + outputSampleRate: 24000, + clientBufferSizeMs: 30000 + } + }, + inactivityMessages: [ + %{ + duration: "10s", + message: "Thank you for calling. Have a great day. Goodbye.", + endBehavior: "END_BEHAVIOR_HANG_UP_SOFT" + }] + }) + +{:ok, ws} = UltravoxWebSocket.start_link(joinUrl: resp.body["joinUrl"]) +``` + In the cell below, we receive stream from the browser via WebRTC, feed it to the API, receive response and send it back to the browser. You need to add the Open AI API token as a `OPEN_AI_TOKEN` secret in Livebook for this to work. To connect via WebRTC, visit http://localhost:1234/talk_to_llm.html after running this cell +Change the implementation and the audio_rate according to the implementation being used (OpenAI, ElevenLabs, Ultravox, etc) + ```elixir {:ok, ws} = OpenAIWebSocket.start_link(token: System.fetch_env!("LB_OPEN_AI_TOKEN"))