Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ config :livebook,
}}

config :livebook, Livebook.Apps.Manager, retry_backoff_base_ms: 0

config :livebook, teams_connection_backoff_range_ms: 0..0
6 changes: 6 additions & 0 deletions lib/livebook/hubs/team_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ defmodule Livebook.Hubs.TeamClient do
{:noreply, %{state | connected?: false, connection_status: reason}}
end

def handle_info({:service_unavailable, reason}, state) do
Hubs.Broadcasts.hub_connection_failed(state.hub.id, reason)

{:noreply, %{state | connected?: false, connection_status: reason}}
end

def handle_info({:server_error, reason}, state) do
Hubs.Broadcasts.hub_server_error(state.hub.id, "#{state.hub.hub_name}: #{reason}")
:ok = Hubs.delete_hub(state.hub.id)
Expand Down
37 changes: 31 additions & 6 deletions lib/livebook/teams/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,26 @@ defmodule Livebook.Teams.Connection do
send(data.listener, {:connection_error, reason})
Logger.warning("Teams WebSocket connection - transport error: #{inspect(reason)}")

# Random between 3 and 10 seconds
backoff = Enum.random(3..10) * 1000
{:keep_state_and_data, {{:timeout, :backoff}, backoff, nil}}
{:keep_state_and_data, {{:timeout, :backoff}, backoff_ms(), nil}}

{:server_error, error} ->
reason = LivebookProto.Error.decode(error).details
{:server_error, 503, body} ->
reason = decode_error_reason(body)
send(data.listener, {:service_unavailable, reason})

Logger.warning(
"Teams WebSocket connection - server error - http status 503: #{inspect(reason)}"
)

{:keep_state_and_data, {{:timeout, :backoff}, backoff_ms(), nil}}

{:server_error, status, body} ->
reason = decode_error_reason(body)
send(data.listener, {:server_error, reason})
Logger.warning("Teams WebSocket connection - server error: #{inspect(reason)}")

Logger.warning(
"Teams WebSocket connection - server error - http status #{status}: #{inspect(reason)}"
)

{:keep_state, data}
end
end
Expand Down Expand Up @@ -152,4 +164,17 @@ defmodule Livebook.Teams.Connection do
defp ensure_closed(data) do
_ = WebSocket.disconnect(data.http_conn, data.websocket, data.ref)
end

defp decode_error_reason(body) do
LivebookProto.Error.decode(body).details
rescue
error in Protobuf.DecodeError ->
"Server error (unexpected response format), error: #{Exception.message(error)}"
end

defp backoff_ms do
# Random between 3 and 10 seconds
range = Application.get_env(:livebook, :teams_connection_backoff_range_ms, 3_000..10_000)
Enum.random(range)
end
end
8 changes: 4 additions & 4 deletions lib/livebook/teams/web_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Livebook.Teams.WebSocket do
@spec connect(list({String.t(), String.t()})) ::
{:ok, conn(), websocket(), ref()}
| {:transport_error, String.t()}
| {:server_error, String.t()}
| {:server_error, integer(), String.t()}
def connect(headers \\ []) do
uri = URI.parse(Livebook.Config.teams_url())
{http_scheme, ws_scheme} = parse_scheme(uri)
Expand Down Expand Up @@ -77,9 +77,9 @@ defmodule Livebook.Teams.WebSocket do
%{body: []} ->
handle_upgrade_responses(responses, conn, ref, state)

%{status: _} ->
%{status: status} ->
Mint.HTTP.close(conn)
{:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
{:server_error, status, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
end
end

Expand All @@ -98,7 +98,7 @@ defmodule Livebook.Teams.WebSocket do

{:error, conn, %UpgradeFailureError{}} ->
Mint.HTTP.close(conn)
{:server_error, state.body |> Enum.reverse() |> IO.iodata_to_binary()}
{:server_error, state.status, state.body |> Enum.reverse() |> IO.iodata_to_binary()}

{:error, conn, exception} ->
Mint.HTTP.close(conn)
Expand Down
86 changes: 86 additions & 0 deletions test/livebook/teams/connection_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Livebook.Teams.ConnectionTest do
use ExUnit.Case, async: false

alias Livebook.Teams.Connection

@moduletag :capture_log

setup do
original_url = Application.get_env(:livebook, :teams_url)

on_exit(fn ->
Application.put_env(:livebook, :teams_url, original_url)
end)

:ok
end

describe "server error handling" do
test "handles invalid protobuf without crashing" do
html_error = """
<!DOCTYPE html>
<html><body><h1>500 Internal Server Error</h1></body></html>
"""

start_error_server(500, html_error, "text/html")

{:ok, conn_pid} = Connection.start_link(self(), [{"x-test", "true"}])

assert_receive {:server_error, error_message}, 5000
assert error_message =~ "Server error (unexpected response format)"
assert Process.alive?(conn_pid)
end

test "sends :service_unavailable on 503 and retries with backoff" do
error = %LivebookProto.Error{details: "Service temporarily unavailable"}
encoded = LivebookProto.Error.encode(error)

start_error_server(503, encoded, "application/octet-stream")

{:ok, conn_pid} = Connection.start_link(self(), [{"x-test", "true"}])

# First attempt
assert_receive {:service_unavailable, "Service temporarily unavailable"}, 1000
assert Process.alive?(conn_pid)

# Retry after backoff
assert_receive {:service_unavailable, _}, 1000
assert Process.alive?(conn_pid)
end
end

defp start_error_server(status_code, body, content_type) do
port = get_free_port()
Application.put_env(:livebook, :teams_url, "http://localhost:#{port}")

plug =
{__MODULE__.ErrorPlug, status_code: status_code, body: body, content_type: content_type}

start_supervised!({Bandit, plug: plug, port: port, startup_log: false})
end

defp get_free_port do
{:ok, socket} = :gen_tcp.listen(0, [])
{:ok, port} = :inet.port(socket)
:gen_tcp.close(socket)
port
end

defmodule ErrorPlug do
@behaviour Plug

@impl true
def init(opts), do: opts

@impl true
def call(conn, opts) do
status_code = Keyword.fetch!(opts, :status_code)
body = Keyword.fetch!(opts, :body)
content_type = Keyword.fetch!(opts, :content_type)

conn
|> Plug.Conn.put_resp_content_type(content_type)
|> Plug.Conn.send_resp(status_code, body)
end
end
end
23 changes: 23 additions & 0 deletions test/livebook_teams/hubs/team_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,29 @@ defmodule Livebook.Hubs.TeamClientTest do
start_supervised!({TeamClient, team})
assert_receive {:hub_server_error, ^id, ^error}
end

test "handles service unavailable without deleting hub", %{team: team} do
id = team.id
reason = "Service temporarily unavailable. Please try again."

assert {:ok, pid} = TeamClient.start_link(team)
assert_receive {:hub_connected, ^id}
assert_receive {:client_connected, ^id}

# Simulate receiving a service_unavailable message from the Connection process
send(pid, {:service_unavailable, reason})

# Should broadcast hub_connection_failed
assert_receive {:hub_connection_failed, ^id, ^reason}

# Hub should NOT be deleted (unlike server_error which deletes the hub)
refute_receive {:hub_deleted, ^id}

# TeamClient should still be running
assert Process.alive?(pid)

TeamClient.stop(id)
end
end

describe "handle user_connected event" do
Expand Down