Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- `on_missing_price` config option - control behavior when `get_price()` cannot find data: `"raise"` (default), `"warn"`, or `"ignore"`
- `PolarsDataFeed` - Load historical data directly from Polars DataFrames

### Changed
- Replaced `Make` with `just` for development commands

### Deprecated
- `CSVDataFeed` - Use `PolarsDataFeed` instead for loading data from CSV files

## [0.2.0] - 2025-11-11

### Added
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@

from alphaflow import AlphaFlow
from alphaflow.brokers import SimpleBroker
from alphaflow.data_feeds import CSVDataFeed
from alphaflow.data_feeds import PolarsDataFeed
from alphaflow.strategies import BuyAndHoldStrategy

# 1. Initialize AlphaFlow
Expand All @@ -135,8 +135,8 @@

# 2. Create DataFeed (e.g., CSV-based daily bars)
flow.set_data_feed(
CSVDataFeed(
file_path="historical_data.csv",
PolarsDataFeed(
df_or_file_path="historical_data.csv",
)
)

Expand Down
3 changes: 2 additions & 1 deletion alphaflow/data_feeds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from alphaflow.data_feeds.alpha_vantage_data_feed import AlphaVantageFeed
from alphaflow.data_feeds.csv_data_feed import CSVDataFeed
from alphaflow.data_feeds.fmp_data_feed import FMPDataFeed
from alphaflow.data_feeds.polars_data_feed import PolarsDataFeed
from alphaflow.data_feeds.polygon_data_feed import PolygonDataFeed

__all__ = ["AlphaVantageFeed", "CSVDataFeed", "FMPDataFeed", "PolygonDataFeed"]
__all__ = ["AlphaVantageFeed", "CSVDataFeed", "FMPDataFeed", "PolarsDataFeed", "PolygonDataFeed"]
92 changes: 16 additions & 76 deletions alphaflow/data_feeds/csv_data_feed.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
"""CSV file data feed implementation."""

import logging
from collections.abc import Generator
from datetime import datetime
from pathlib import Path

import polars as pl
from typing_extensions import deprecated

from alphaflow import DataFeed
from alphaflow.events.market_data_event import MarketDataEvent
from alphaflow.data_feeds.polars_data_feed import PolarsDataFeed

logger = logging.getLogger(__name__)


class CSVDataFeed(DataFeed):
@deprecated("CSVDataFeed is deprecated and will be removed in a future release. Please use PolarsDataFeed instead.")
class CSVDataFeed(PolarsDataFeed):
"""Data feed that loads market data from CSV files."""

def __init__(
Expand All @@ -30,6 +28,8 @@ def __init__(
) -> None:
"""Initialize the CSV data feed.

**Deprecated**: Use PolarsDataFeed instead.

Args:
file_path: Path to the CSV file containing market data.
col_timestamp: Name of the timestamp column.
Expand All @@ -42,73 +42,13 @@ def __init__(

"""
self.file_path = Path(file_path) if isinstance(file_path, str) else file_path
self._col_timestamp = col_timestamp
self._col_symbol = col_symbol
self._col_open = col_open
self._col_high = col_high
self._col_low = col_low
self._col_close = col_close
self._col_volume = col_volume

def run(
self,
symbol: str,
start_timestamp: datetime | None,
end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
"""Load and yield market data events from the CSV file.

Args:
symbol: The ticker symbol to load data for.
start_timestamp: Optional start time for filtering data.
end_timestamp: Optional end time for filtering data.

Yields:
MarketDataEvent objects containing OHLCV data.

Raises:
ValueError: If required columns are missing from the CSV.

"""
logger.debug("Opening CSV file...")
df = pl.read_csv(self.file_path, try_parse_dates=True)

required_cols = {
self._col_timestamp,
self._col_close,
self._col_high,
self._col_low,
self._col_open,
self._col_volume,
}

missing_cols = required_cols.difference(df.columns)
if missing_cols:
raise ValueError(f"Missing columns: {missing_cols}")

# Convert date column to datetime if needed (polars parses as date by default)
if df[self._col_timestamp].dtype == pl.Date:
df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))

# Filter by symbol using polars
if self._col_symbol in df.columns:
df = df.filter(pl.col(self._col_symbol) == symbol)

# Filter by timestamp bounds using polars
if start_timestamp:
df = df.filter(pl.col(self._col_timestamp) >= start_timestamp)
if end_timestamp:
df = df.filter(pl.col(self._col_timestamp) <= end_timestamp)

# Convert to dicts once after all filtering
for row in df.iter_rows(named=True):
event = MarketDataEvent(
timestamp=row[self._col_timestamp],
symbol=symbol,
open=row[self._col_open],
high=row[self._col_high],
low=row[self._col_low],
close=row[self._col_close],
volume=row[self._col_volume],
)
yield event
super().__init__(
df_or_file_path=file_path,
col_timestamp=col_timestamp,
col_symbol=col_symbol,
col_open=col_open,
col_high=col_high,
col_low=col_low,
col_close=col_close,
col_volume=col_volume,
)
125 changes: 125 additions & 0 deletions alphaflow/data_feeds/polars_data_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Polars data feed implementation."""

import logging
from collections.abc import Generator
from datetime import datetime
from pathlib import Path

import polars as pl

from alphaflow import DataFeed
from alphaflow.events.market_data_event import MarketDataEvent

logger = logging.getLogger(__name__)


class PolarsDataFeed(DataFeed):
"""Data feed that loads market data from Polars dataframes."""

def __init__(
self,
df_or_file_path: Path | str | pl.DataFrame | pl.LazyFrame,
*,
col_timestamp: str = "Date",
col_symbol: str = "Symbol",
col_open: str = "Open",
col_high: str = "High",
col_low: str = "Low",
col_close: str = "Close",
col_volume: str = "Volume",
) -> None:
"""Initialize the Polars data feed.

Args:
df_or_file_path: Polars dataframe or path to the Polars dataframe containing market data.
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring incorrectly refers to "path to the Polars dataframe" when it should say "path to a CSV or Parquet file". Polars DataFrames are in-memory data structures, not files on disk. The parameter can accept either a DataFrame/LazyFrame OR a file path to CSV/Parquet files.

Copilot uses AI. Check for mistakes.
col_timestamp: Name of the timestamp column.
col_symbol: Name of the symbol column.
col_open: Name of the open price column.
col_high: Name of the high price column.
col_low: Name of the low price column.
col_close: Name of the close price column.
col_volume: Name of the volume column.

"""
self.df_or_file_path = df_or_file_path
self._col_timestamp = col_timestamp
self._col_symbol = col_symbol
self._col_open = col_open
self._col_high = col_high
self._col_low = col_low
self._col_close = col_close
self._col_volume = col_volume

def run(
self,
symbol: str,
start_timestamp: datetime | None,
end_timestamp: datetime | None,
) -> Generator[MarketDataEvent, None, None]:
"""Load and yield market data events from the Polars dataframe.

Args:
symbol: The ticker symbol to load data for.
start_timestamp: Optional start time for filtering data.
end_timestamp: Optional end time for filtering data.

Yields:
MarketDataEvent objects containing OHLCV data.

Raises:
ValueError: If required columns are missing from the Polars dataframe.

"""
if isinstance(self.df_or_file_path, (str, Path)):
df_path = Path(self.df_or_file_path) if isinstance(self.df_or_file_path, str) else self.df_or_file_path
if df_path.suffix in {".parquet", ".parq"}:
df = pl.read_parquet(df_path)
df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate datetime casting for parquet files is unnecessary. Line 77 casts the timestamp column to Datetime after reading parquet, but lines 101-102 already handle this case for all file types when the dtype is pl.Date. The first cast should be removed to avoid redundant operations.

Suggested change
df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))

Copilot uses AI. Check for mistakes.
elif df_path.suffix == ".csv":
df = pl.read_csv(df_path, try_parse_dates=True)
else:
raise ValueError(f"Unsupported file format: {df_path.suffix}")
elif isinstance(self.df_or_file_path, pl.LazyFrame):
df = self.df_or_file_path.collect()
else:
df = self.df_or_file_path

required_cols = {
self._col_timestamp,
self._col_close,
self._col_high,
self._col_low,
self._col_open,
self._col_volume,
}

missing_cols = required_cols.difference(df.columns)
if missing_cols:
raise ValueError(f"Missing columns: {missing_cols}")

# Convert date column to datetime if needed (polars parses as date by default)
if df[self._col_timestamp].dtype == pl.Date:
df = df.with_columns(pl.col(self._col_timestamp).cast(pl.Datetime))

# Filter by symbol using polars
if self._col_symbol in df.columns:
df = df.filter(pl.col(self._col_symbol) == symbol)

# Filter by timestamp bounds using polars
if start_timestamp:
df = df.filter(pl.col(self._col_timestamp) >= start_timestamp)
if end_timestamp:
df = df.filter(pl.col(self._col_timestamp) <= end_timestamp)

# Convert to dicts once after all filtering
for row in df.sort(by=self._col_timestamp).iter_rows(named=True):
event = MarketDataEvent(
timestamp=row[self._col_timestamp],
symbol=symbol,
open=row[self._col_open],
high=row[self._col_high],
low=row[self._col_low],
close=row[self._col_close],
volume=row[self._col_volume],
)
yield event
20 changes: 10 additions & 10 deletions alphaflow/tests/test_alphaflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from alphaflow import AlphaFlow
from alphaflow.analyzers import DefaultAnalyzer
from alphaflow.brokers import SimpleBroker
from alphaflow.data_feeds import CSVDataFeed
from alphaflow.data_feeds import PolarsDataFeed
from alphaflow.strategies import BuyAndHoldStrategy


Expand Down Expand Up @@ -50,7 +50,7 @@ def test_alphaflow_add_equity() -> None:
def test_alphaflow_set_data_feed() -> None:
"""Test setting the data feed."""
af = AlphaFlow()
data_feed = CSVDataFeed("alphaflow/tests/data/AAPL.csv")
data_feed = PolarsDataFeed("alphaflow/tests/data/AAPL.csv")

af.set_data_feed(data_feed)

Expand Down Expand Up @@ -174,7 +174,7 @@ def test_alphaflow_set_backtest_end_timestamp_string() -> None:
def test_alphaflow_get_timestamps() -> None:
"""Test getting all timestamps from loaded data."""
af = AlphaFlow()
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_cash(10000)
af.set_data_start_timestamp(datetime(1980, 12, 25))
Expand All @@ -192,7 +192,7 @@ def test_alphaflow_get_timestamps() -> None:
def test_alphaflow_get_price() -> None:
"""Test getting price for a symbol at a timestamp."""
af = AlphaFlow()
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_cash(10000)
af.set_data_start_timestamp(datetime(1980, 12, 25))
Expand All @@ -207,7 +207,7 @@ def test_alphaflow_get_price() -> None:
def test_alphaflow_get_price_raises_error_for_missing_data() -> None:
"""Test get_price raises error when no data exists after timestamp."""
af = AlphaFlow()
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_cash(10000)
af.set_data_start_timestamp(datetime(1980, 12, 25))
Expand All @@ -222,7 +222,7 @@ def test_alphaflow_get_price_raises_error_for_missing_data() -> None:
def test_alphaflow_on_missing_price_warn(caplog: pytest.LogCaptureFixture) -> None:
"""Test that on_missing_price='warn' logs a warning and returns 0.0."""
af = AlphaFlow(on_missing_price="warn")
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_cash(10000)
af.set_data_start_timestamp(datetime(1980, 12, 25))
Expand All @@ -239,7 +239,7 @@ def test_alphaflow_on_missing_price_warn(caplog: pytest.LogCaptureFixture) -> No
def test_alphaflow_on_missing_price_ignore() -> None:
"""Test that on_missing_price='ignore' silently returns 0.0."""
af = AlphaFlow(on_missing_price="ignore")
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_cash(10000)
af.set_data_start_timestamp(datetime(1980, 12, 25))
Expand Down Expand Up @@ -269,7 +269,7 @@ def test_alphaflow_run_raises_error_without_data_feed() -> None:
def test_alphaflow_run_raises_error_for_live_trading() -> None:
"""Test run raises error for live trading (not implemented)."""
af = AlphaFlow()
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_cash(10000)

Expand All @@ -280,7 +280,7 @@ def test_alphaflow_run_raises_error_for_live_trading() -> None:
def test_alphaflow_complete_backtest_flow() -> None:
"""Test complete backtest flow with all components."""
af = AlphaFlow()
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.set_benchmark("AAPL")
af.add_strategy(BuyAndHoldStrategy(symbol="AAPL", target_weight=1.0))
Expand All @@ -307,7 +307,7 @@ def test_alphaflow_complete_backtest_flow() -> None:
def test_simple_backtest() -> None:
"""Test a simple buy-and-hold backtest with AAPL."""
af = AlphaFlow()
af.set_data_feed(CSVDataFeed("alphaflow/tests/data/AAPL.csv"))
af.set_data_feed(PolarsDataFeed("alphaflow/tests/data/AAPL.csv"))
af.add_equity("AAPL")
af.add_strategy(BuyAndHoldStrategy(symbol="AAPL", target_weight=1.0))
af.set_broker(SimpleBroker())
Expand Down
Loading