@@ -10,7 +10,7 @@ final class PSQLRowStreamTests: XCTestCase {
10
10
let logger = Logger ( label: " PSQLRowStreamTests " )
11
11
let eventLoop = EmbeddedEventLoop ( )
12
12
13
- func testEmptyStream ( ) {
13
+ func testEmptyStreamAndDrainDoesNotThrowErrorAfterConsumption ( ) {
14
14
let stream = PSQLRowStream (
15
15
source: . noRows( . success( . tag( " INSERT 0 1 " ) ) ) ,
16
16
eventLoop: self . eventLoop,
@@ -20,10 +20,9 @@ final class PSQLRowStreamTests: XCTestCase {
20
20
XCTAssertEqual ( try stream. all ( ) . wait ( ) , [ ] )
21
21
XCTAssertEqual ( stream. commandTag, " INSERT 0 1 " )
22
22
23
- // Test 'drain' works in this case
24
23
XCTAssertNoThrow ( try stream. drain ( ) . wait ( ) )
25
24
}
26
-
25
+
27
26
func testFailedStream( ) {
28
27
let stream = PSQLRowStream (
29
28
source: . noRows( . failure( PSQLError . serverClosedConnection ( underlying: nil ) ) ) ,
@@ -84,37 +83,37 @@ final class PSQLRowStreamTests: XCTestCase {
84
83
)
85
84
XCTAssertEqual ( dataSource. hitDemand, 0 )
86
85
XCTAssertEqual ( dataSource. hitCancel, 0 )
87
-
86
+
88
87
stream. receive ( [
89
88
[ ByteBuffer ( string: " 0 " ) ] ,
90
89
[ ByteBuffer ( string: " 1 " ) ]
91
90
] )
92
-
91
+
93
92
XCTAssertEqual ( dataSource. hitDemand, 0 , " Before we have a consumer demand is not signaled " )
94
-
93
+
95
94
// attach consumer
96
95
let future = stream. all ( )
97
96
XCTAssertEqual ( dataSource. hitDemand, 1 )
98
-
97
+
99
98
stream. receive ( [
100
99
[ ByteBuffer ( string: " 2 " ) ] ,
101
100
[ ByteBuffer ( string: " 3 " ) ]
102
101
] )
103
102
XCTAssertEqual ( dataSource. hitDemand, 2 )
104
-
103
+
105
104
stream. receive ( [
106
105
[ ByteBuffer ( string: " 4 " ) ] ,
107
106
[ ByteBuffer ( string: " 5 " ) ]
108
107
] )
109
108
XCTAssertEqual ( dataSource. hitDemand, 3 )
110
-
109
+
111
110
stream. receive ( completion: . success( " SELECT 2 " ) )
112
-
111
+
113
112
var rows : [ PostgresRow ] ?
114
113
XCTAssertNoThrow ( rows = try future. wait ( ) )
115
114
XCTAssertEqual ( rows? . count, 6 )
116
115
}
117
-
116
+
118
117
func testOnRowAfterStreamHasFinished( ) {
119
118
let dataSource = CountingDataSource ( )
120
119
let stream = PSQLRowStream (
@@ -240,6 +239,84 @@ final class PSQLRowStreamTests: XCTestCase {
240
239
XCTAssertEqual ( stream. commandTag, " SELECT 6 " )
241
240
}
242
241
242
+ func testEmptyStreamDrainsSuccessfully( ) {
243
+ let stream = PSQLRowStream (
244
+ source: . noRows( . success( . tag( " INSERT 0 1 " ) ) ) ,
245
+ eventLoop: self . eventLoop,
246
+ logger: self . logger
247
+ )
248
+
249
+ XCTAssertNoThrow ( try stream. drain ( ) . wait ( ) )
250
+ XCTAssertEqual ( stream. commandTag, " INSERT 0 1 " )
251
+ }
252
+
253
+ func testDrainAfterStreamHasFinished( ) {
254
+ let dataSource = CountingDataSource ( )
255
+ let stream = PSQLRowStream (
256
+ source: . stream(
257
+ [ self . makeColumnDescription ( name: " foo " , dataType: . text, format: . binary) ] ,
258
+ dataSource
259
+ ) ,
260
+ eventLoop: self . eventLoop,
261
+ logger: self . logger
262
+ )
263
+ XCTAssertEqual ( dataSource. hitDemand, 0 )
264
+ XCTAssertEqual ( dataSource. hitCancel, 0 )
265
+
266
+ stream. receive ( [
267
+ [ ByteBuffer ( string: " 0 " ) ] ,
268
+ [ ByteBuffer ( string: " 1 " ) ]
269
+ ] )
270
+
271
+ XCTAssertEqual ( dataSource. hitDemand, 0 , " Before we have a consumer demand is not signaled " )
272
+ stream. receive ( completion: . success( " SELECT 2 " ) )
273
+
274
+ // attach consumer
275
+ XCTAssertNoThrow ( try stream. drain ( ) . wait ( ) )
276
+ XCTAssertEqual ( dataSource. hitDemand, 0 ) // TODO: Is this right?
277
+ }
278
+
279
+ func testDrainBeforeStreamHasFinished( ) {
280
+ let dataSource = CountingDataSource ( )
281
+ let stream = PSQLRowStream (
282
+ source: . stream(
283
+ [ self . makeColumnDescription ( name: " foo " , dataType: . text, format: . binary) ] ,
284
+ dataSource
285
+ ) ,
286
+ eventLoop: self . eventLoop,
287
+ logger: self . logger
288
+ )
289
+ XCTAssertEqual ( dataSource. hitDemand, 0 )
290
+ XCTAssertEqual ( dataSource. hitCancel, 0 )
291
+
292
+ stream. receive ( [
293
+ [ ByteBuffer ( string: " 0 " ) ] ,
294
+ [ ByteBuffer ( string: " 1 " ) ]
295
+ ] )
296
+
297
+ XCTAssertEqual ( dataSource. hitDemand, 0 , " Before we have a consumer demand is not signaled " )
298
+
299
+ // attach consumer
300
+ let future = stream. drain ( )
301
+ XCTAssertEqual ( dataSource. hitDemand, 1 )
302
+
303
+ stream. receive ( [
304
+ [ ByteBuffer ( string: " 2 " ) ] ,
305
+ [ ByteBuffer ( string: " 3 " ) ]
306
+ ] )
307
+ XCTAssertEqual ( dataSource. hitDemand, 2 )
308
+
309
+ stream. receive ( [
310
+ [ ByteBuffer ( string: " 4 " ) ] ,
311
+ [ ByteBuffer ( string: " 5 " ) ]
312
+ ] )
313
+ XCTAssertEqual ( dataSource. hitDemand, 3 )
314
+
315
+ stream. receive ( completion: . success( " SELECT 2 " ) )
316
+
317
+ XCTAssertNoThrow ( try future. wait ( ) )
318
+ }
319
+
243
320
func makeColumnDescription( name: String , dataType: PostgresDataType , format: PostgresFormat ) -> RowDescription . Column {
244
321
RowDescription . Column (
245
322
name: " test " ,
0 commit comments