Skip to content

Latest commit

 

History

History
1092 lines (909 loc) · 20.6 KB

beams.livemd

File metadata and controls

1092 lines (909 loc) · 20.6 KB

Beams Guide

Mix.install(
  [
    {:lux, "~> 0.4.0"}
    {:kino, "~> 0.14.2"}
  ],
  start_applications: false
)
Mix.Task.run("setup", install_deps: false)

Application.put_env(:venomous, :snake_manager, %{
  python_opts: [
    module_paths: [
      Lux.Python.module_path(),
      Lux.Python.module_path(:deps)
    ],
    python_executable: "python3"
  ]
})

Application.ensure_all_started([:lux, :kino, :ex_unit])

Overview

Beams are the orchestration layer of Lux, allowing you to compose Prisms, Lenses, and other components into complex workflows. They support sequential, parallel, and conditional execution with rich error handling and logging.

A Beam consists of:

  • A sequence of steps
  • Input and output schemas
  • Execution configuration
  • Error handling and logging
  • Parameter passing between steps

Creating a Beam

Here's a basic example of a Beam:

defmodule MyApp.Beams.ContentProcessor do
  use Lux.Beam,
    name: "Content Processor",
    description: "Processes and enriches content",
    input_schema: %{
      type: :object,
      properties: %{
        text: %{type: :string},
        language: %{type: :string},
        enrich: %{type: :boolean, default: true}
      },
      required: ["text"]
    },
    output_schema: %{
      type: :object,
      properties: %{
        sentiment: %{type: :string},
        entities: %{type: :array, items: %{type: :string}},
        summary: %{type: :string}
      }
    },
    generate_execution_log: true

  sequence do
    step(:sentiment, Lux.Prisms.SentimentAnalysisPrism, [:input])

    branch {__MODULE__, :should_enrich?} do
      true ->
        parallel do
          step(:entities, MyApp.Prisms.EntityExtraction, [:input], retries: 2)

          step(:summary, MyApp.Prisms.TextSummarization, [:input],
            timeout: :timer.seconds(30)
          )
        end

      false ->
        step(:skip, MyApp.Prisms.NoOp, %{})
    end
  end

  def should_enrich?(ctx) do
    Map.get(ctx.input, :enrich, true)
  end
end

Kino.nothing()

Before run defined Beam, we need to define some Prisms that it will use.

defmodule MyApp.Prisms.EntityExtraction do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: [""]}}
  end
end

defmodule MyApp.Prisms.TextSummarization do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: "A short summary"}}
  end
end

defmodule MyApp.Prisms.NoOp do
  use Lux.Prism

  def handler(input, _ctx) do
    {:ok, input}
  end
end

Kino.nothing()

You can run defined Beam with run/2 function. You can inspect entire steps and their outputs in log.

frame = Kino.Frame.new() |> Kino.render()

{:ok, result, log} = MyApp.Beams.ContentProcessor.run(%{
  enrich: true,
  text: "hello world",
  language: "en"
})

Kino.Frame.append(frame, result)
Kino.Frame.append(frame, log)
Kino.nothing()

Step Types

Sequential Steps

Execute steps one after another:

import Lux.Beam

sequence do
  step(:first, FirstPrism, %{param: :value})
  step(:second, SecondPrism, [:steps, :first, :result])
  step(:third, ThirdPrism, [:steps, :third, :result])
end
{:sequence,
 [
   %{
     id: :first,
     module: FirstPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: %{param: :value}
   },
   %{
     id: :second,
     module: SecondPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: [:steps, :first, :result]
   },
   %{
     id: :third,
     module: ThirdPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: [:steps, :third, :result]
   }
 ]}

Parallel Steps

Execute steps concurrently:

parallel do
  step(:analysis, AnalysisPrism, %{data: :input})
  step(:validation, ValidationPrism, %{data: :input})
  step(:enrichment, EnrichmentPrism, %{data: :input})
end
{:parallel,
 [
   %{
     id: :analysis,
     module: AnalysisPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: %{data: :input}
   },
   %{
     id: :validation,
     module: ValidationPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: %{data: :input}
   },
   %{
     id: :enrichment,
     module: EnrichmentPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: %{data: :input}
   }
 ]}

Conditional Steps

Branch based on conditions:

branch {__MODULE__, :check_condition} do
  :path_a ->
    sequence do
      step(:a1, PathAPrism, %{})
      step(:a2, PathAPrism2, %{})
    end

  :path_b ->
    sequence do
      step(:b1, PathBPrism, %{})
      step(:b2, PathBPrism2, %{})
    end

  _ ->
    step(:default, DefaultPrism, %{})
end
{:branch, {nil, :check_condition},
 [
   path_a: {:sequence,
    [
      %{
        id: :a1,
        module: PathAPrism,
        opts: %{
          timeout: 300000,
          dependencies: [],
          fallback: nil,
          retries: 0,
          retry_backoff: 1000,
          track: false,
          store_io: false
        },
        params: %{}
      },
      %{
        id: :a2,
        module: PathAPrism2,
        opts: %{
          timeout: 300000,
          dependencies: [],
          fallback: nil,
          retries: 0,
          retry_backoff: 1000,
          track: false,
          store_io: false
        },
        params: %{}
      }
    ]},
   path_b: {:sequence,
    [
      %{
        id: :b1,
        module: PathBPrism,
        opts: %{
          timeout: 300000,
          dependencies: [],
          fallback: nil,
          retries: 0,
          retry_backoff: 1000,
          track: false,
          store_io: false
        },
        params: %{}
      },
      %{
        id: :b2,
        module: PathBPrism2,
        opts: %{
          timeout: 300000,
          dependencies: [],
          fallback: nil,
          retries: 0,
          retry_backoff: 1000,
          track: false,
          store_io: false
        },
        params: %{}
      }
    ]},
   _: %{
     id: :default,
     module: DefaultPrism,
     opts: %{
       timeout: 300000,
       dependencies: [],
       fallback: nil,
       retries: 0,
       retry_backoff: 1000,
       track: false,
       store_io: false
     },
     params: %{}
   }
 ]}

Parameter References

Basic References

Reference previous step outputs:

step(:data, DataPrism, %{value: :input_value})
step(:process, ProcessPrism, %{data: [:steps, :data, :result]})
%{
  id: :process,
  module: ProcessPrism,
  opts: %{
    timeout: 300000,
    dependencies: [],
    fallback: nil,
    retries: 0,
    retry_backoff: 1000,
    track: false,
    store_io: false
  },
  params: %{data: [:steps, :data, :result]}
}

Nested References

Access nested values:

step(:complex, ComplexPrism, %{
  value: [:steps, :data, :result, :nested, :value],
  config: [:steps, :settings, :result]
})
%{
  id: :complex,
  module: ComplexPrism,
  opts: %{
    timeout: 300000,
    dependencies: [],
    fallback: nil,
    retries: 0,
    retry_backoff: 1000,
    track: false,
    store_io: false
  },
  params: %{value: [:steps, :data, :result, :nested, :value], config: [:steps, :settings, :result]}
}

Multiple References

Combine multiple references:

step(:combine, CombinePrism, %{
  first: [:steps, :step1, :result],
  second: [:steps, :step2, :result],
  third: [:steps, :step3, :result]
})
%{
  id: :combine,
  module: CombinePrism,
  opts: %{
    timeout: 300000,
    dependencies: [],
    fallback: nil,
    retries: 0,
    retry_backoff: 1000,
    track: false,
    store_io: false
  },
  params: %{
    first: [:steps, :step1, :result],
    second: [:steps, :step2, :result],
    third: [:steps, :step3, :result]
  }
}

Step Configuration

Timeouts

step(:long_running, LongPrism, %{}, timeout: :timer.minutes(10))
%{
  id: :long_running,
  module: LongPrism,
  opts: %{
    timeout: 600000,
    dependencies: [],
    fallback: nil,
    retries: 0,
    retry_backoff: 1000,
    track: false,
    store_io: false
  },
  params: %{}
}

Retries

step(:flaky, FlakyPrism, %{},
  retries: 3,
  retry_backoff: 1000
)
%{
  id: :flaky,
  module: FlakyPrism,
  opts: %{
    timeout: 300000,
    dependencies: [],
    fallback: nil,
    retries: 3,
    retry_backoff: 1000,
    track: false,
    store_io: false
  },
  params: %{}
}

Dependencies

step(:dependent, DependentPrism, %{}, dependencies: ["step1", "step2"])
%{
  id: :dependent,
  module: DependentPrism,
  opts: %{
    timeout: 300000,
    dependencies: ["step1", "step2"],
    fallback: nil,
    retries: 0,
    retry_backoff: 1000,
    track: false,
    store_io: false
  },
  params: %{}
}

Execution Logging

step(:important, ImportantPrism, %{}, store_io: true)
%{
  id: :important,
  module: ImportantPrism,
  opts: %{
    timeout: 300000,
    dependencies: [],
    fallback: nil,
    retries: 0,
    retry_backoff: 1000,
    track: false,
    store_io: true
  },
  params: %{}
}

Error Handling

Basic Error Handling

defmodule MyApp.Beams.RobustBeam do
  use Lux.Beam

  sequence do
    step(:risky, MyApp.Prisms.RiskyPrism, %{},
      retries: 3,
      retry_backoff: 1000,
      fallback: MyApp.Fallbacks.RiskyFallback
    )
  end
end

defmodule MyApp.Prisms.RiskyPrism do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:error, :too_risky}
  end
end

defmodule MyApp.Fallbacks.RiskyFallback do
  def handle_error(%{error: error, context: ctx}) do
    case error do
      %{recoverable: true} ->
        {:continue, %{status: :degraded, result: compute_fallback(ctx)}}

      _ ->
        {:stop, "Unrecoverable error: #{inspect(error)}"}
    end
  end

  defp compute_fallback(_ctx) do
    # Compute fallback result
    %{value: 0}
  end
end

MyApp.Beams.RobustBeam.run(%{})
{:error, "Unrecoverable error: :too_risky", nil}

Inline Fallbacks

You can also define fallbacks inline using anonymous functions:

defmodule MyApp.Beams.InlineFallbackBeam do
  use Lux.Beam

  sequence do
    step(:operation, MyApp.Prisms.OperationPrism, %{},
      fallback: fn %{error: error, context: _ctx} ->
        if recoverable?(error) do
          {:continue, %{status: :degraded}}
        else
          {:stop, "Cannot proceed: #{inspect(error)}"}
        end
      end)
  end

  defp recoverable?(%{type: :temporary}), do: true
  defp recoverable?(_), do: false
end

defmodule MyApp.Prisms.OperationPrism do
  use Lux.Prism
  
  def handler(:ok, _ctx) do
    {:ok, %{type: :temporary}}
  end

  def handler(_input, _ctx) do
    {:error, %{type: :temporary}}
  end
end

MyApp.Beams.InlineFallbackBeam.run(%{})
{:ok, %{status: :degraded}, nil}

Fallback Behavior

Fallbacks can:

  • Access the error and context
  • Return {:continue, result} to continue execution
  • Return {:stop, reason} to halt the beam
  • Transform errors into valid results
  • Implement recovery strategies

Custom Error Handling

defmodule MyApp.Beams.ErrorHandlingBeam do
  use Lux.Beam

  sequence do
    step(:operation, MyApp.Prisms.OperationPrism, [:input])
    
    branch {__MODULE__, :handle_error} do
      :retry ->
        step(:retry, MyApp.Prisms.RetryPrism, %{
          original_input: [:steps, :operatoin, :input],
          error: [:steps, :operatoin, :error]
        })

      :fallback ->
        step(:fallback, MyApp.Prisms.FallbackPrism, %{
          error: [:steps, :operatoin, :error]
        })

      :fail ->
        step(:error, MyApp.Prisms.ErrorPrism, %{
          error: [:steps, :operatoin, :error],
          context: :context
        })
    end
  end

  def handle_error(ctx) do
    case ctx.steps.operation.result do
      %{type: :temporary} -> :retry
      %{type: :permanent} -> :fallback
      _ -> :fail
    end
  end
end

defmodule MyApp.Prisms.RetryPrism do
  use Lux.Prism
  
  def handler(_input, _ctx) do
    {:ok, %{type: :retried}}
  end
end

MyApp.Beams.ErrorHandlingBeam.run(:ok)
{:ok, %{type: :retried}, nil}

Best Practices

  1. Step Organization

    • Group related steps together
    • Use meaningful step IDs
    • Keep step configurations clear
    • Document complex workflows
  2. Error Handling

    • Use appropriate retry strategies
    • Implement fallback paths
    • Log errors with context
    • Handle all error cases
  3. Performance

    • Use parallel execution when possible
    • Set appropriate timeouts
    • Monitor execution times
  4. Testing

    • Test happy paths
    • Test error scenarios
    • Test parallel execution
    • Test timeouts and retries

Example test:

defmodule MyApp.Beams.ContentProcessorTest do
  use UnitCase, async: true

  describe "run/2" do
    test "processes content successfully" do
      {:ok, _result, log} = MyApp.Beams.ContentProcessor.run(%{
        text: "Great product!",
        language: "en",
        enrich: true
      })

      steps_by_id = Map.new(log.steps, &{&1.id, &1})

      assert steps_by_id[:sentiment].output["sentiment"] == "positive"
      assert length(steps_by_id[:entities].output.result) > 0
      assert is_binary(steps_by_id[:summary].output.result)
    end

    test "respects enrich flag" do
      {:ok, _result, log} = MyApp.Beams.ContentProcessor.run(%{
        text: "Simple text",
        enrich: false
      })
      
      steps_by_id = Map.new(log.steps, &{&1.id, &1})

      assert steps_by_id[:sentiment]
      refute steps_by_id[:entities]
      refute steps_by_id[:summary]
    end
  end
end

ExUnit.run()
Running ExUnit with seed: 780470, max_cases: 40

..
Finished in 0.00 seconds (0.00s async, 0.00s sync)
2 tests, 0 failures
%{total: 2, failures: 0, excluded: 0, skipped: 0}

Advanced Topics

Complex Workflows

defmodule MyApp.Beams.ComplexWorkflow do
  use Lux.Beam,
    generate_execution_log: true

  sequence do
    parallel do
      step(:data1, DataSource1, %{})
      step(:data2, DataSource2, %{})
      step(:data3, DataSource3, %{})
    end

    step(:validate, DataValidator, %{
      data1: [:steps, :data1, :result],
      data2: [:steps, :data2, :result],
      data3: [:steps, :data3, :result]
    })

    branch {__MODULE__, :process_path} do
      :fast ->
        step(:quick, QuickProcessor, %{
          data: [:steps, :validate, :result]
        })

      :thorough ->
        parallel do
          step(:analysis, DeepAnalysis, %{
            data: [:steps, :validate, :result]
          })

          step(:enrichment, DataEnrichment, %{
            data: [:steps, :validate, :result]
          })

          step(:verification, DataVerification, %{
            data: [:steps, :validate, :result]
          })
        end
    end

    step(:finalize, Finalizer, %{
      result: [:steps, :process_path, :result]
    })
  end

  def process_path(ctx) do
    cond do
      ctx.steps.validate.result.size > 1000 -> :thorough
      true -> :fast
    end
  end
end

Kino.nothing()
defmodule DataSource1 do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, :source1}
  end
end

defmodule DataSource2 do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, :source2}
  end
end

defmodule DataSource3 do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, :source3}
  end
end

defmodule DataValidator do
  use Lux.Prism

  def handler(input, _ctx) do
    {:ok, Map.merge(input, %{size: 2000})}
  end
end

defmodule QuickProcessor do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: "quick result"}}
  end
end

defmodule DeepAnalysis do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: "analized result"}}
  end
end

defmodule DataEnrichment do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: "enriched result"}}
  end
end

defmodule DataVerification do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: "verified result"}}
  end
end

defmodule Finalizer do
  use Lux.Prism

  def handler(_input, _ctx) do
    {:ok, %{result: "finalized result"}}
  end
end
{:module, Finalizer, <<70, 79, 82, 49, 0, 0, 10, ...>>, {:handler, 2}}
MyApp.Beams.ComplexWorkflow.run(%{})
{:ok, %{result: "finalized result"},
 %{
   input: %{},
   output: %{result: "finalized result"},
   status: :completed,
   started_at: ~U[2025-02-12 06:30:32.034005Z],
   steps: [
     %{
       error: nil,
       id: :data1,
       input: %{},
       output: :source1,
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034005Z],
       step_index: 0,
       completed_at: ~U[2025-02-12 06:30:32.034027Z]
     },
     %{
       error: nil,
       id: :data2,
       input: %{},
       output: :source2,
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034137Z],
       step_index: 1,
       completed_at: ~U[2025-02-12 06:30:32.034142Z]
     },
     %{
       error: nil,
       id: :data3,
       input: %{},
       output: :source3,
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034240Z],
       step_index: 2,
       completed_at: ~U[2025-02-12 06:30:32.034247Z]
     },
     %{
       error: nil,
       id: :validate,
       input: %{data1: :source1, data2: :source2, data3: :source3},
       output: %{size: 2000, data1: :source1, data2: :source2, data3: :source3},
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034338Z],
       step_index: 3,
       completed_at: ~U[2025-02-12 06:30:32.034343Z]
     },
     %{
       error: nil,
       id: :analysis,
       input: %{data: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}},
       output: %{result: "analized result"},
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034426Z],
       step_index: 4,
       completed_at: ~U[2025-02-12 06:30:32.034446Z]
     },
     %{
       error: nil,
       id: :enrichment,
       input: %{data: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}},
       output: %{result: "enriched result"},
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034522Z],
       step_index: 5,
       completed_at: ~U[2025-02-12 06:30:32.034535Z]
     },
     %{
       error: nil,
       id: :verification,
       input: %{data: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}},
       output: %{result: "verified result"},
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034549Z],
       step_index: 6,
       completed_at: ~U[2025-02-12 06:30:32.034557Z]
     },
     %{
       error: nil,
       id: :finalize,
       input: %{result: nil},
       output: %{result: "finalized result"},
       status: :completed,
       started_at: ~U[2025-02-12 06:30:32.034601Z],
       step_index: 7,
       completed_at: ~U[2025-02-12 06:30:32.034607Z]
     }
   ],
   completed_at: ~U[2025-02-12 06:30:32.034607Z],
   beam_id: "b54a67b8-7da6-4e53-a90c-4363c721a2c3",
   started_by: "system"
 }}

Execution Monitoring

defmodule MyApp.Beams.MonitoredBeam do
  use Lux.Beam,
    generate_execution_log: true,
    monitoring: %{
      metrics: [:duration, :memory, :errors],
      alerts: [
        %{
          condition: &__MODULE__.alert?/1,
          action: &__MODULE__.notify/1
        }
      ]
    }

  sequence do
    step(:operation, MonitoredPrism, %{},
      track: true)
  end

  def alert?(metrics) do
    metrics.duration > :timer.seconds(30) ||
    metrics.memory > 1_000_000_000
  end

  def notify(_metrics) do
    # Send alert
  end
end

Kino.nothing()