@@ -109,11 +109,14 @@ def __init__(self, config: Union[Dict[str, Any], str, Path]):
109109 def resolve_path (self , path : Union [str , Path ]) -> Union [str , Path ]:
110110 """Resolve path relative to config file."""
111111 path_str = str (path )
112-
112+
113113 # URLs and URIs should be returned as-is
114- if any (path_str .startswith (proto ) for proto in ['http://' , 'https://' , 's3://' , 'gs://' ]):
114+ if any (
115+ path_str .startswith (proto )
116+ for proto in ["http://" , "https://" , "s3://" , "gs://" ]
117+ ):
115118 return path_str
116-
119+
117120 # File paths get resolved relative to config
118121 path = Path (path )
119122 if self .config_dir and not path .is_absolute ():
@@ -133,46 +136,51 @@ def run(self) -> pd.DataFrame:
133136 # 1. Load data
134137 path = self .resolve_path (self .config ["data" ]["path" ])
135138 logger .info (f"Loading data from { path } " )
136-
139+
137140 # Check file extension (works for both Path objects and URL strings)
138141 path_str = str (path )
139142 columns = self .config ["data" ].get ("columns" ) # Optional column selection
140-
143+
141144 # Check if lazy filtering is requested for parquet files
142145 use_lazy = self .config ["data" ].get ("use_lazy_filter" , False )
143146 filter_query = self .config ["data" ].get ("filter_query" )
144-
147+
145148 if path_str .endswith (".parquet" ) and use_lazy and filter_query :
146149 # Use polars for lazy filtering
147150 import polars as pl
151+
148152 logger .info (f"Using lazy filter: { filter_query } " )
149-
153+
150154 # Lazy load with polars
151155 lazy_df = pl .scan_parquet (path )
152-
156+
153157 # Apply filter
154158 lazy_df = lazy_df .filter (pl .sql_expr (filter_query ))
155-
159+
156160 # Select columns if specified
157161 if columns :
158162 lazy_df = lazy_df .select (columns )
159-
163+
160164 # Collect and convert to pandas
161165 df = lazy_df .collect ().to_pandas ()
162-
166+
163167 # Log column information
164168 metadata_cols = [col for col in df .columns if col .startswith ("Metadata_" )]
165- feature_cols = [col for col in df .columns if not col .startswith ("Metadata_" )]
166-
167- logger .info (f"Loaded { len (df )} rows after filtering with { len (df .columns )} columns" )
169+ feature_cols = [
170+ col for col in df .columns if not col .startswith ("Metadata_" )
171+ ]
172+
173+ logger .info (
174+ f"Loaded { len (df )} rows after filtering with { len (df .columns )} columns"
175+ )
168176 logger .info (f" Metadata columns (first 5): { metadata_cols [:5 ]} " )
169177 logger .info (f" Feature columns (first 5): { feature_cols [:5 ]} " )
170-
178+
171179 elif path_str .endswith (".parquet" ):
172180 df = pd .read_parquet (path , columns = columns )
173181 else :
174182 df = pd .read_csv (path , usecols = columns )
175-
183+
176184 if not use_lazy or not filter_query :
177185 logger .info (f"Loaded { len (df )} rows with { len (df .columns )} columns" )
178186
0 commit comments