@@ -31,9 +31,11 @@ use datafusion::common::UnnestOptions;
3131use datafusion:: config:: { CsvOptions , TableParquetOptions } ;
3232use datafusion:: dataframe:: { DataFrame , DataFrameWriteOptions } ;
3333use datafusion:: datasource:: TableProvider ;
34+ use datafusion:: error:: DataFusionError ;
3435use datafusion:: execution:: SendableRecordBatchStream ;
3536use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
3637use datafusion:: prelude:: * ;
38+ use futures:: { StreamExt , TryStreamExt } ;
3739use pyo3:: exceptions:: PyValueError ;
3840use pyo3:: prelude:: * ;
3941use pyo3:: pybacked:: PyBackedStr ;
@@ -70,6 +72,9 @@ impl PyTableProvider {
7072 PyTable :: new ( table_provider)
7173 }
7274}
75+ const MAX_TABLE_BYTES_TO_DISPLAY : usize = 2 * 1024 * 1024 ; // 2 MB
76+ const MIN_TABLE_ROWS_TO_DISPLAY : usize = 20 ;
77+ const MAX_LENGTH_CELL_WITHOUT_MINIMIZE : usize = 25 ;
7378
7479/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
7580/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
@@ -111,56 +116,151 @@ impl PyDataFrame {
111116 }
112117
113118 fn __repr__ ( & self , py : Python ) -> PyDataFusionResult < String > {
114- let df = self . df . as_ref ( ) . clone ( ) . limit ( 0 , Some ( 10 ) ) ?;
115- let batches = wait_for_future ( py, df. collect ( ) ) ?;
116- let batches_as_string = pretty:: pretty_format_batches ( & batches) ;
117- match batches_as_string {
118- Ok ( batch) => Ok ( format ! ( "DataFrame()\n {batch}" ) ) ,
119- Err ( err) => Ok ( format ! ( "Error: {:?}" , err. to_string( ) ) ) ,
119+ let ( batches, has_more) = wait_for_future (
120+ py,
121+ collect_record_batches_to_display ( self . df . as_ref ( ) . clone ( ) , 10 , 10 ) ,
122+ ) ?;
123+ if batches. is_empty ( ) {
124+ // This should not be reached, but do it for safety since we index into the vector below
125+ return Ok ( "No data to display" . to_string ( ) ) ;
120126 }
121- }
122127
123- fn _repr_html_ ( & self , py : Python ) -> PyDataFusionResult < String > {
124- let mut html_str = "<table border='1'>\n " . to_string ( ) ;
128+ let batches_as_displ =
129+ pretty:: pretty_format_batches ( & batches) . map_err ( py_datafusion_err) ?;
130+
131+ let additional_str = match has_more {
132+ true => "\n Data truncated." ,
133+ false => "" ,
134+ } ;
125135
126- let df = self . df . as_ref ( ) . clone ( ) . limit ( 0 , Some ( 10 ) ) ? ;
127- let batches = wait_for_future ( py , df . collect ( ) ) ? ;
136+ Ok ( format ! ( "DataFrame() \n {batches_as_displ}{additional_str}" ) )
137+ }
128138
139+ fn _repr_html_ ( & self , py : Python ) -> PyDataFusionResult < String > {
140+ let ( batches, has_more) = wait_for_future (
141+ py,
142+ collect_record_batches_to_display (
143+ self . df . as_ref ( ) . clone ( ) ,
144+ MIN_TABLE_ROWS_TO_DISPLAY ,
145+ usize:: MAX ,
146+ ) ,
147+ ) ?;
129148 if batches. is_empty ( ) {
130- html_str . push_str ( "</table> \n " ) ;
131- return Ok ( html_str ) ;
149+ // This should not be reached, but do it for safety since we index into the vector below
150+ return Ok ( "No data to display" . to_string ( ) ) ;
132151 }
133152
153+ let table_uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
154+
155+ let mut html_str = "
156+ <style>
157+ .expandable-container {
158+ display: inline-block;
159+ max-width: 200px;
160+ }
161+ .expandable {
162+ white-space: nowrap;
163+ overflow: hidden;
164+ text-overflow: ellipsis;
165+ display: block;
166+ }
167+ .full-text {
168+ display: none;
169+ white-space: normal;
170+ }
171+ .expand-btn {
172+ cursor: pointer;
173+ color: blue;
174+ text-decoration: underline;
175+ border: none;
176+ background: none;
177+ font-size: inherit;
178+ display: block;
179+ margin-top: 5px;
180+ }
181+ </style>
182+
183+ <div style=\" width: 100%; max-width: 1000px; max-height: 300px; overflow: auto; border: 1px solid #ccc;\" >
184+ <table style=\" border-collapse: collapse; min-width: 100%\" >
185+ <thead>\n " . to_string ( ) ;
186+
134187 let schema = batches[ 0 ] . schema ( ) ;
135188
136189 let mut header = Vec :: new ( ) ;
137190 for field in schema. fields ( ) {
138- header. push ( format ! ( "<th>{}</td >" , field. name( ) ) ) ;
191+ header. push ( format ! ( "<th style='border: 1px solid black; padding: 8px; text-align: left; background-color: #f2f2f2; white-space: nowrap; min-width: fit-content; max-width: fit-content;' >{}</th >" , field. name( ) ) ) ;
139192 }
140193 let header_str = header. join ( "" ) ;
141- html_str. push_str ( & format ! ( "<tr>{}</tr>\n " , header_str) ) ;
142-
143- for batch in batches {
144- let formatters = batch
145- . columns ( )
146- . iter ( )
147- . map ( |c| ArrayFormatter :: try_new ( c. as_ref ( ) , & FormatOptions :: default ( ) ) )
148- . map ( |c| {
149- c. map_err ( |e| PyValueError :: new_err ( format ! ( "Error: {:?}" , e. to_string( ) ) ) )
150- } )
151- . collect :: < Result < Vec < _ > , _ > > ( ) ?;
152-
153- for row in 0 ..batch. num_rows ( ) {
194+ html_str. push_str ( & format ! ( "<tr>{}</tr></thead><tbody>\n " , header_str) ) ;
195+
196+ let batch_formatters = batches
197+ . iter ( )
198+ . map ( |batch| {
199+ batch
200+ . columns ( )
201+ . iter ( )
202+ . map ( |c| ArrayFormatter :: try_new ( c. as_ref ( ) , & FormatOptions :: default ( ) ) )
203+ . map ( |c| {
204+ c. map_err ( |e| PyValueError :: new_err ( format ! ( "Error: {:?}" , e. to_string( ) ) ) )
205+ } )
206+ . collect :: < Result < Vec < _ > , _ > > ( )
207+ } )
208+ . collect :: < Result < Vec < _ > , _ > > ( ) ?;
209+
210+ let rows_per_batch = batches. iter ( ) . map ( |batch| batch. num_rows ( ) ) ;
211+
212+ // We need to build up row by row for html
213+ let mut table_row = 0 ;
214+ for ( batch_formatter, num_rows_in_batch) in batch_formatters. iter ( ) . zip ( rows_per_batch) {
215+ for batch_row in 0 ..num_rows_in_batch {
216+ table_row += 1 ;
154217 let mut cells = Vec :: new ( ) ;
155- for formatter in & formatters {
156- cells. push ( format ! ( "<td>{}</td>" , formatter. value( row) ) ) ;
218+ for ( col, formatter) in batch_formatter. iter ( ) . enumerate ( ) {
219+ let cell_data = formatter. value ( batch_row) . to_string ( ) ;
220+ // From testing, primitive data types do not typically get larger than 21 characters
221+ if cell_data. len ( ) > MAX_LENGTH_CELL_WITHOUT_MINIMIZE {
222+ let short_cell_data = & cell_data[ 0 ..MAX_LENGTH_CELL_WITHOUT_MINIMIZE ] ;
223+ cells. push ( format ! ( "
224+ <td style='border: 1px solid black; padding: 8px; text-align: left; white-space: nowrap;'>
225+ <div class=\" expandable-container\" >
226+ <span class=\" expandable\" id=\" {table_uuid}-min-text-{table_row}-{col}\" >{short_cell_data}</span>
227+ <span class=\" full-text\" id=\" {table_uuid}-full-text-{table_row}-{col}\" >{cell_data}</span>
228+ <button class=\" expand-btn\" onclick=\" toggleDataFrameCellText('{table_uuid}',{table_row},{col})\" >...</button>
229+ </div>
230+ </td>" ) ) ;
231+ } else {
232+ cells. push ( format ! ( "<td style='border: 1px solid black; padding: 8px; text-align: left; white-space: nowrap;'>{}</td>" , formatter. value( batch_row) ) ) ;
233+ }
157234 }
158235 let row_str = cells. join ( "" ) ;
159236 html_str. push_str ( & format ! ( "<tr>{}</tr>\n " , row_str) ) ;
160237 }
161238 }
239+ html_str. push_str ( "</tbody></table></div>\n " ) ;
240+
241+ html_str. push_str ( "
242+ <script>
243+ function toggleDataFrameCellText(table_uuid, row, col) {
244+ var shortText = document.getElementById(table_uuid + \" -min-text-\" + row + \" -\" + col);
245+ var fullText = document.getElementById(table_uuid + \" -full-text-\" + row + \" -\" + col);
246+ var button = event.target;
247+
248+ if (fullText.style.display === \" none\" ) {
249+ shortText.style.display = \" none\" ;
250+ fullText.style.display = \" inline\" ;
251+ button.textContent = \" (less)\" ;
252+ } else {
253+ shortText.style.display = \" inline\" ;
254+ fullText.style.display = \" none\" ;
255+ button.textContent = \" ...\" ;
256+ }
257+ }
258+ </script>
259+ " ) ;
162260
163- html_str. push_str ( "</table>\n " ) ;
261+ if has_more {
262+ html_str. push_str ( "Data truncated due to size." ) ;
263+ }
164264
165265 Ok ( html_str)
166266 }
@@ -771,3 +871,83 @@ fn record_batch_into_schema(
771871
772872 RecordBatch :: try_new ( schema, data_arrays)
773873}
874+
875+ /// This is a helper function to return the first non-empty record batch from executing a DataFrame.
876+ /// It additionally returns a bool, which indicates if there are more record batches available.
877+ /// We do this so we can determine if we should indicate to the user that the data has been
878+ /// truncated. This collects until we have achived both of these two conditions
879+ ///
880+ /// - We have collected our minimum number of rows
881+ /// - We have reached our limit, either data size or maximum number of rows
882+ ///
883+ /// Otherwise it will return when the stream has exhausted. If you want a specific number of
884+ /// rows, set min_rows == max_rows.
885+ async fn collect_record_batches_to_display (
886+ df : DataFrame ,
887+ min_rows : usize ,
888+ max_rows : usize ,
889+ ) -> Result < ( Vec < RecordBatch > , bool ) , DataFusionError > {
890+ let partitioned_stream = df. execute_stream_partitioned ( ) . await ?;
891+ let mut stream = futures:: stream:: iter ( partitioned_stream) . flatten ( ) ;
892+ let mut size_estimate_so_far = 0 ;
893+ let mut rows_so_far = 0 ;
894+ let mut record_batches = Vec :: default ( ) ;
895+ let mut has_more = false ;
896+
897+ while ( size_estimate_so_far < MAX_TABLE_BYTES_TO_DISPLAY && rows_so_far < max_rows)
898+ || rows_so_far < min_rows
899+ {
900+ let mut rb = match stream. next ( ) . await {
901+ None => {
902+ break ;
903+ }
904+ Some ( Ok ( r) ) => r,
905+ Some ( Err ( e) ) => return Err ( e) ,
906+ } ;
907+
908+ let mut rows_in_rb = rb. num_rows ( ) ;
909+ if rows_in_rb > 0 {
910+ size_estimate_so_far += rb. get_array_memory_size ( ) ;
911+
912+ if size_estimate_so_far > MAX_TABLE_BYTES_TO_DISPLAY {
913+ let ratio = MAX_TABLE_BYTES_TO_DISPLAY as f32 / size_estimate_so_far as f32 ;
914+ let total_rows = rows_in_rb + rows_so_far;
915+
916+ let mut reduced_row_num = ( total_rows as f32 * ratio) . round ( ) as usize ;
917+ if reduced_row_num < min_rows {
918+ reduced_row_num = min_rows. min ( total_rows) ;
919+ }
920+
921+ let limited_rows_this_rb = reduced_row_num - rows_so_far;
922+ if limited_rows_this_rb < rows_in_rb {
923+ rows_in_rb = limited_rows_this_rb;
924+ rb = rb. slice ( 0 , limited_rows_this_rb) ;
925+ has_more = true ;
926+ }
927+ }
928+
929+ if rows_in_rb + rows_so_far > max_rows {
930+ rb = rb. slice ( 0 , max_rows - rows_so_far) ;
931+ has_more = true ;
932+ }
933+
934+ rows_so_far += rb. num_rows ( ) ;
935+ record_batches. push ( rb) ;
936+ }
937+ }
938+
939+ if record_batches. is_empty ( ) {
940+ return Ok ( ( Vec :: default ( ) , false ) ) ;
941+ }
942+
943+ if !has_more {
944+ // Data was not already truncated, so check to see if more record batches remain
945+ has_more = match stream. try_next ( ) . await {
946+ Ok ( None ) => false , // reached end
947+ Ok ( Some ( _) ) => true ,
948+ Err ( _) => false , // Stream disconnected
949+ } ;
950+ }
951+
952+ Ok ( ( record_batches, has_more) )
953+ }
0 commit comments