@@ -657,6 +657,7 @@ mod tests {
657657 use super :: * ;
658658 use crate :: client:: ByteStream ;
659659 use futures_util:: StreamExt ;
660+ use std:: { collections:: HashMap , io, time} ;
660661
661662 #[ tokio:: test]
662663 async fn read_stream ( ) {
@@ -1029,6 +1030,184 @@ mod tests {
10291030 assert_eq ! ( buffer. as_ref( ) , b"\r \r \r \r \r \r \r \r \r \r \r \r \r " ) ;
10301031 }
10311032
1033+ #[ tokio:: test]
1034+ async fn field_boundary_finding_should_not_cause_busy_exhausted_loop_1 ( ) {
1035+ let ( sender, receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1036+ let stream = StreamFromChannel :: new ( receiver) ;
1037+ send_multipart_message_in_neat_chunks ( sender) ;
1038+ assert_no_busy_exhausted_loop ( stream) . await ;
1039+ }
1040+
1041+ #[ tokio:: test]
1042+ async fn field_boundary_finding_should_not_cause_busy_exhausted_loop_2 ( ) {
1043+ let ( sender, receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1044+ let stream = StreamFromChannel :: new ( receiver) ;
1045+ send_multipart_message_in_not_so_neat_chunks ( sender) ;
1046+ assert_no_busy_exhausted_loop ( stream) . await ;
1047+ }
1048+
1049+ #[ tokio:: test]
1050+ async fn empty_field_data_chunk_should_indicate_end_of_field_1 ( ) {
1051+ let ( sender, receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1052+ let stream = StreamFromChannel :: new ( receiver) ;
1053+ send_multipart_message_in_neat_chunks ( sender) ;
1054+ assert_empty_field_data_chunk_indicates_end_of_field ( stream) . await ;
1055+ }
1056+
1057+ #[ tokio:: test]
1058+ async fn empty_field_data_chunk_should_indicate_end_of_field_2 ( ) {
1059+ let ( sender, receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
1060+ let stream = StreamFromChannel :: new ( receiver) ;
1061+ send_multipart_message_in_not_so_neat_chunks ( sender) ;
1062+ assert_empty_field_data_chunk_indicates_end_of_field ( stream) . await ;
1063+ }
1064+
1065+ fn send_multipart_message_in_neat_chunks (
1066+ sender : tokio:: sync:: mpsc:: UnboundedSender < & ' static [ u8 ] > ,
1067+ ) {
1068+ tokio:: task:: spawn ( async move {
1069+ // Send first chunk of a multipart message. The first chunk ends neatly exactly where the
1070+ // field value of the first field ends.
1071+ sender
1072+ . send (
1073+ b"--abc\r
1074+ Content-Disposition: form-data; name=\" field1\" \r
1075+ \r
1076+ foo" ,
1077+ )
1078+ . unwrap ( ) ;
1079+
1080+ tokio:: time:: sleep ( time:: Duration :: from_millis ( 1 ) ) . await ;
1081+
1082+ // Send the remainder of the message.
1083+ sender
1084+ . send (
1085+ b"\r
1086+ --abc\r
1087+ Content-Disposition: form-data; name=\" field2\" \r
1088+ \r
1089+ bar\r
1090+ --abc--\r
1091+ " ,
1092+ )
1093+ . unwrap ( ) ;
1094+ } ) ;
1095+ }
1096+
1097+ fn send_multipart_message_in_not_so_neat_chunks (
1098+ sender : tokio:: sync:: mpsc:: UnboundedSender < & ' static [ u8 ] > ,
1099+ ) {
1100+ tokio:: task:: spawn ( async move {
1101+ // Send first chunk of a multipart message. The first chunk ends with a carriage return
1102+ // which is not enough to determine the end of the field.
1103+ sender
1104+ . send (
1105+ b"--abc\r
1106+ Content-Disposition: form-data; name=\" field1\" \r
1107+ \r
1108+ foo\r " ,
1109+ )
1110+ . unwrap ( ) ;
1111+
1112+ tokio:: time:: sleep ( time:: Duration :: from_millis ( 1 ) ) . await ;
1113+
1114+ // Send the remainder of the message.
1115+ sender
1116+ . send (
1117+ b"
1118+ --abc\r
1119+ Content-Disposition: form-data; name=\" field2\" \r
1120+ \r
1121+ bar\r
1122+ --abc--\r
1123+ " ,
1124+ )
1125+ . unwrap ( ) ;
1126+ } ) ;
1127+ }
1128+
1129+ async fn assert_no_busy_exhausted_loop ( stream : StreamFromChannel ) {
1130+ let mut fields = HashMap :: new ( ) ;
1131+ let mut empty_data_chunk_count: u8 = 0 ;
1132+ let mut mpart = MultipartStream :: new ( "abc" , stream) ;
1133+ while let Some ( result) = mpart. next ( ) . await {
1134+ let mut field = result. unwrap ( ) ;
1135+
1136+ let mut field_data: Vec < u8 > = Vec :: new ( ) ;
1137+ while let Some ( result) = field. next ( ) . await {
1138+ let field_data_chunk = result. unwrap ( ) ;
1139+
1140+ if field_data_chunk. is_empty ( ) {
1141+ empty_data_chunk_count = empty_data_chunk_count. saturating_add ( 1 ) ;
1142+ continue ;
1143+ }
1144+
1145+ field_data. extend_from_slice ( & field_data_chunk) ;
1146+ }
1147+
1148+ let _ = fields. insert (
1149+ field. name ( ) . unwrap ( ) . to_string ( ) ,
1150+ String :: from_utf8 ( field_data) . unwrap ( ) ,
1151+ ) ;
1152+ }
1153+
1154+ assert_eq ! ( fields. get( "field1" ) , Some ( & String :: from( "foo" ) ) ) ;
1155+ assert_eq ! ( fields. get( "field2" ) , Some ( & String :: from( "bar" ) ) ) ;
1156+ assert_eq ! ( empty_data_chunk_count, 1 ) ;
1157+ }
1158+
1159+ async fn assert_empty_field_data_chunk_indicates_end_of_field ( stream : StreamFromChannel ) {
1160+ let mut fields = HashMap :: new ( ) ;
1161+ let mut empty_data_chunk_count: u8 = 0 ;
1162+ let mut mpart = MultipartStream :: new ( "abc" , stream) ;
1163+ while let Some ( result) = mpart. next ( ) . await {
1164+ let mut field = result. unwrap ( ) ;
1165+
1166+ let mut field_data: Vec < u8 > = Vec :: new ( ) ;
1167+ while let Some ( result) = field. next ( ) . await {
1168+ let field_data_chunk = result. unwrap ( ) ;
1169+
1170+ if field_data_chunk. is_empty ( ) {
1171+ empty_data_chunk_count = empty_data_chunk_count. saturating_add ( 1 ) ;
1172+ break ;
1173+ }
1174+
1175+ field_data. extend_from_slice ( & field_data_chunk) ;
1176+ }
1177+
1178+ let _ = fields. insert (
1179+ field. name ( ) . unwrap ( ) . to_string ( ) ,
1180+ String :: from_utf8 ( field_data) . unwrap ( ) ,
1181+ ) ;
1182+ }
1183+
1184+ assert_eq ! ( fields. get( "field1" ) , Some ( & String :: from( "foo" ) ) ) ;
1185+ assert_eq ! ( fields. get( "field2" ) , Some ( & String :: from( "bar" ) ) ) ;
1186+ assert_eq ! ( empty_data_chunk_count, 1 ) ;
1187+ }
1188+
1189+ struct StreamFromChannel {
1190+ receiver : tokio:: sync:: mpsc:: UnboundedReceiver < & ' static [ u8 ] > ,
1191+ }
1192+
1193+ impl StreamFromChannel {
1194+ fn new ( receiver : tokio:: sync:: mpsc:: UnboundedReceiver < & ' static [ u8 ] > ) -> Self {
1195+ Self { receiver }
1196+ }
1197+ }
1198+
1199+ impl futures_core:: Stream for StreamFromChannel {
1200+ type Item = io:: Result < Bytes > ;
1201+
1202+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
1203+ match self . receiver . poll_recv ( cx) {
1204+ Poll :: Ready ( Some ( buf) ) => Poll :: Ready ( Some ( Ok ( Bytes :: from_static ( buf) ) ) ) ,
1205+ Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
1206+ Poll :: Pending => Poll :: Pending ,
1207+ }
1208+ }
1209+ }
1210+
10321211 #[ test]
10331212 fn test_strip_no_strip_necessary ( ) {
10341213 let name: Cow < str > = Cow :: Owned ( "äöüß.txt" . to_owned ( ) ) ;
0 commit comments