@@ -15,6 +15,7 @@ namespace KafkaFlow.Retry.IntegrationTests
15
15
[ Collection ( "BootstrapperHostCollection" ) ]
16
16
public class EmptyPartitionKeyRetryDurableTests
17
17
{
18
+ private const int defaultWaitingTimeSeconds = 120 ;
18
19
private readonly IRepositoryProvider repositoryProvider ;
19
20
private readonly IServiceProvider serviceProvider ;
20
21
@@ -33,31 +34,27 @@ public static IEnumerable<object[]> EmptyKeyScenarios()
33
34
RepositoryType . MongoDb ,
34
35
typeof ( IMessageProducer < RetryDurableGuaranteeOrderedConsumptionMongoDbProducer > ) ,
35
36
typeof ( RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert ) ,
36
- 2 , //numberOfMessagesToBeProduced
37
- 1 //numberOfMessagesByEachSameKey
37
+ 3 //numberOfMessagesToBeProduced
38
38
} ;
39
39
yield return new object [ ]
40
40
{
41
41
RepositoryType . SqlServer ,
42
42
typeof ( IMessageProducer < RetryDurableGuaranteeOrderedConsumptionSqlServerProducer > ) ,
43
43
typeof ( RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert ) ,
44
- 2 ,
45
- 1
44
+ 3
46
45
} ;
47
46
yield return new object [ ]
48
47
{
49
48
RepositoryType . MongoDb ,
50
49
typeof ( IMessageProducer < RetryDurableLatestConsumptionMongoDbProducer > ) ,
51
50
typeof ( RetryDurableLatestConsumptionPhysicalStorageAssert ) ,
52
- 2 ,
53
51
1
54
52
} ;
55
53
yield return new object [ ]
56
54
{
57
55
RepositoryType . SqlServer ,
58
56
typeof ( IMessageProducer < RetryDurableLatestConsumptionSqlServerProducer > ) ,
59
57
typeof ( RetryDurableLatestConsumptionPhysicalStorageAssert ) ,
60
- 2 ,
61
58
1
62
59
} ;
63
60
}
@@ -68,12 +65,11 @@ internal async Task EmptyKeyRetryDurableTest(
68
65
RepositoryType repositoryType ,
69
66
Type producerType ,
70
67
Type physicalStorageType ,
71
- int numberOfMessagesToBeProduced ,
72
- int numberOfMessagesByEachSameKey )
68
+ int numberOfMessagesToBeProduced )
73
69
{
74
70
// Arrange
71
+ var numberOfMessagesByEachSameKey = 1 ;
75
72
var numberOfTimesThatEachMessageIsTriedWhenDone = 1 ;
76
- var numberOfTimesThatEachMessageIsTriedBeforeDurable = 4 ;
77
73
var numberOfTimesThatEachMessageIsTriedDuringDurable = 1 ;
78
74
var producer = this . serviceProvider . GetRequiredService ( producerType ) as IMessageProducer ;
79
75
var physicalStorageAssert = this . serviceProvider . GetRequiredService ( physicalStorageType ) as IPhysicalStorageAssert ;
@@ -91,27 +87,7 @@ internal async Task EmptyKeyRetryDurableTest(
91
87
await producer . ProduceAsync ( message . Key , message ) . ConfigureAwait ( false ) ;
92
88
}
93
89
94
- RetryDurableTestMessage messageToValidate ;
95
- if ( producer is IMessageProducer < RetryDurableLatestConsumptionSqlServerProducer > || producer is IMessageProducer < RetryDurableLatestConsumptionMongoDbProducer > )
96
- {
97
- messageToValidate = messages [ numberOfMessagesToBeProduced - 1 ] ;
98
-
99
- // Assert - Creation
100
- foreach ( var message in messages )
101
- {
102
- await InMemoryAuxiliarStorage < RetryDurableTestMessage >
103
- . AssertEmptyPartitionKeyCountMessageAsync ( message , numberOfTimesThatEachMessageIsTriedBeforeDurable , 120 )
104
- . ConfigureAwait ( false ) ;
105
- }
106
- }
107
- else
108
- {
109
- messageToValidate = messages [ 0 ] ;
110
-
111
- await InMemoryAuxiliarStorage < RetryDurableTestMessage >
112
- . AssertEmptyPartitionKeyCountMessageAsync ( messageToValidate , numberOfTimesThatEachMessageIsTriedBeforeDurable , 120 )
113
- . ConfigureAwait ( false ) ;
114
- }
90
+ RetryDurableTestMessage messageToValidate = messages [ 0 ] ;
115
91
116
92
await physicalStorageAssert
117
93
. AssertEmptyKeyRetryDurableMessageRetryingAsync ( repositoryType , messageToValidate , numberOfMessagesByEachSameKey )
@@ -121,7 +97,7 @@ await physicalStorageAssert
121
97
InMemoryAuxiliarStorage < RetryDurableTestMessage > . Clear ( ) ;
122
98
123
99
await InMemoryAuxiliarStorage < RetryDurableTestMessage >
124
- . AssertEmptyPartitionKeyCountMessageAsync ( messageToValidate , numberOfTimesThatEachMessageIsTriedDuringDurable , 120 )
100
+ . AssertEmptyPartitionKeyCountMessageAsync ( messageToValidate , numberOfTimesThatEachMessageIsTriedDuringDurable , defaultWaitingTimeSeconds )
125
101
. ConfigureAwait ( false ) ;
126
102
127
103
await physicalStorageAssert
@@ -133,7 +109,7 @@ await physicalStorageAssert
133
109
InMemoryAuxiliarStorage < RetryDurableTestMessage > . Clear ( ) ;
134
110
135
111
await InMemoryAuxiliarStorage < RetryDurableTestMessage >
136
- . AssertEmptyPartitionKeyCountMessageAsync ( messageToValidate , numberOfTimesThatEachMessageIsTriedWhenDone , 120 )
112
+ . AssertEmptyPartitionKeyCountMessageAsync ( messageToValidate , numberOfTimesThatEachMessageIsTriedWhenDone , defaultWaitingTimeSeconds )
137
113
. ConfigureAwait ( false ) ;
138
114
139
115
await physicalStorageAssert
0 commit comments