Skip to content

chore: add postgres to the samples #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions samples/KafkaFlow.Retry.Common.Sample/Helpers/PostgresHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Npgsql;

namespace KafkaFlow.Retry.Common.Sample.Helpers;

public static class PostgresHelper
{
public static async Task RecreateSqlSchema(string databaseName, string connectionString)
{
await using (var openCon = new NpgsqlConnection(connectionString))
{
openCon.Open();
openCon.ChangeDatabase(databaseName);

var scripts = GetScriptsForSchemaCreation();

foreach (var script in scripts)
{
await using (var queryCommand = new NpgsqlCommand(script))
{
queryCommand.Connection = openCon;

await queryCommand.ExecuteNonQueryAsync();
}
}
}
}

private static IEnumerable<string> GetScriptsForSchemaCreation()
{
var postgresAssembly = Assembly.LoadFrom("KafkaFlow.Retry.Postgres.dll");
return postgresAssembly
.GetManifestResourceNames()
.OrderBy(x => x)
.Select(script =>
{
using (var s = postgresAssembly.GetManifestResourceStream(script))
{
using (var sr = new StreamReader(s))
{
return sr.ReadToEnd();
}
}
})
.ToList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Retry.Postgres\KafkaFlow.Retry.Postgres.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Retry.SqlServer\KafkaFlow.Retry.SqlServer.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Confluent.Kafka;
using KafkaFlow.Configuration;
using KafkaFlow.Retry.MongoDb;
using KafkaFlow.Retry.Postgres;
using KafkaFlow.Retry.Sample.Exceptions;
using KafkaFlow.Retry.Sample.Handlers;
using KafkaFlow.Retry.Sample.Messages;
Expand Down Expand Up @@ -112,8 +113,102 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoDb(
return cluster;
}

internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer(
internal static IClusterConfigurationBuilder SetupRetryDurablePostgres(
this IClusterConfigurationBuilder cluster,
string postgresConnectionString,
string postgresDatabaseName)
{
cluster
.AddProducer(
"kafka-flow-retry-durable-postgres-producer",
producer => producer
.DefaultTopic("sample-kafka-flow-retry-durable-postgres-topic")
.WithCompression(CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
)
.WithAcks(Acks.All)
)
.AddConsumer(
consumer => consumer
.Topic("sample-kafka-flow-retry-durable-postgres-topic")
.WithGroupId("sample-consumer-kafka-flow-retry-durable-postgres")
.WithName("kafka-flow-retry-durable-postgres-consumer")
.WithBufferSize(10)
.WithWorkersCount(20)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetDeserializer>()
.RetryDurable(
configure => configure
.Handle<RetryDurableTestException>()
.WithMessageType(typeof(RetryDurableTestMessage))
.WithPostgresDataProvider(
postgresConnectionString,
postgresDatabaseName)
.WithRetryPlanBeforeRetryDurable(
configure => configure
.TryTimes(3)
.WithTimeBetweenTriesPlan(
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(1000))
.ShouldPauseConsumer(false)
)
.WithEmbeddedRetryCluster(
cluster,
configure => configure
.WithRetryTopicName("sample-kafka-flow-retry-durable-postgres-topic-retry")
.WithRetryConsumerBufferSize(4)
.WithRetryConsumerWorkersCount(2)
.WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
.AddHandler<RetryDurableTestHandler>()
)
.Enabled(true)
)
.WithPollingJobsConfiguration(
configure => configure
.WithSchedulerId("retry-durable-postgres-polling-id")
.WithRetryDurablePollingConfiguration(
configure => configure
.WithCronExpression("0 0/1 * 1/1 * ? *")
.WithExpirationIntervalFactor(1)
.WithFetchSize(10)
.Enabled(true)
)
.WithCleanupPollingConfiguration(
configure => configure
.Enabled(false)
.WithCronExpression("0 0/1 * 1/1 * ? *")
)
.WithRetryDurableActiveQueuesCountPollingConfiguration(
configure => configure
.Enabled(true)
.WithCronExpression("0 0/1 * 1/1 * ? *")
.Do((numberOfActiveQueues) =>
{
Console.Write($"Number of postgres active queues {numberOfActiveQueues}");
})
)

))
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
.AddHandler<RetryDurableTestHandler>())
)
);

return cluster;
}

internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer(
this IClusterConfigurationBuilder cluster,
string sqlServerConnectionString,
string sqlServerDatabaseName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.Retry.MongoDb\KafkaFlow.Retry.MongoDb.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Retry.Postgres\KafkaFlow.Retry.Postgres.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Retry.SqlServer\KafkaFlow.Retry.SqlServer.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Retry\KafkaFlow.Retry.csproj" />
<ProjectReference Include="..\KafkaFlow.Retry.Common.Sample\KafkaFlow.Retry.Common.Sample.csproj" />
Expand Down
57 changes: 53 additions & 4 deletions samples/KafkaFlow.Retry.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ private static async Task Main()
var mongoDbRetryQueueItemCollectionName = "RetryQueueItems";
var sqlServerConnectionString = string.Join(
string.Empty,
"Server=localhost;",
"Server=sqlserver.docker.internal;",
"User ID = sa;",
"Password=Finance123.;",
"Trusted_Connection=false;",
"TrustServerCertificate=true;",
"Integrated Security=false;",
Expand All @@ -35,17 +37,29 @@ private static async Task Main()
"Encrypt=false;"
);
var sqlServerDatabaseName = "kafka_flow_retry_durable_sample";
var postgresConnectionString = string.Join(
string.Empty,
"Server=localhost;",
"User Id=postgres;",
"Password=Postgres123123;",
"Port=5432;",
"Application Name=KafkaFlow Retry Tests;"
);
var postgresDatabaseName = "kafka_flow_retry_durable_sample";
var topics = new[]
{
"sample-kafka-flow-retry-simple-topic",
"sample-kafka-flow-retry-forever-topic",
"sample-kafka-flow-retry-durable-sqlserver-topic",
"sample-kafka-flow-retry-durable-sqlserver-topic-retry",
"sample-kafka-flow-retry-durable-mongodb-topic",
"sample-kafka-flow-retry-durable-mongodb-topic-retry"
"sample-kafka-flow-retry-durable-mongodb-topic-retry",
"sample-kafka-flow-retry-durable-postgres-topic",
"sample-kafka-flow-retry-durable-postgres-topic-retry",
};

SqlServerHelper.RecreateSqlSchema(sqlServerDatabaseName, sqlServerConnectionString).GetAwaiter().GetResult();
PostgresHelper.RecreateSqlSchema(postgresDatabaseName, postgresConnectionString).GetAwaiter().GetResult();
KafkaHelper.CreateKafkaTopics(brokers, topics).GetAwaiter().GetResult();

services.AddKafka(
Expand All @@ -65,6 +79,9 @@ private static async Task Main()
.SetupRetryDurableSqlServer(
sqlServerConnectionString,
sqlServerDatabaseName)
.SetupRetryDurablePostgres(
postgresConnectionString,
postgresDatabaseName)
)
);

Expand All @@ -81,7 +98,7 @@ private static async Task Main()

while (true)
{
Console.Write("retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver or exit: ");
Console.Write("\nChoose a command:\nretry-simple\nretry-forever\nretry-durable-mongodb\nretry-durable-sqlserver\nretry-durable-postgres\nexit\n: ");
var input = Console.ReadLine().ToLower(CultureInfo.InvariantCulture);

switch (input)
Expand Down Expand Up @@ -149,6 +166,38 @@ await producers["kafka-flow-retry-durable-sqlserver-producer"]
}
break;

case "retry-durable-postgres":
{
Console.Write("Number of the distinct messages to produce: ");
int.TryParse(Console.ReadLine(), out var numOfMessages);
Console.Write("Number of messages with same partition key: ");
int.TryParse(Console.ReadLine(), out var numOfMessagesWithSamePartitionkey);

var messages = Enumerable
.Range(0, numOfMessages)
.SelectMany(
x =>
{
var partitionKey = Guid.NewGuid().ToString();
return Enumerable
.Range(0, numOfMessagesWithSamePartitionkey)
.Select(y => new BatchProduceItem(
"sample-kafka-flow-retry-durable-postgres-topic",
partitionKey,
new RetryDurableTestMessage { Text = $"Message({y}): {Guid.NewGuid()}" },
null))
.ToList();
}
)
.ToList();

await producers["kafka-flow-retry-durable-postgres-producer"]
.BatchProduceAsync(messages)
.ConfigureAwait(false);
Console.WriteLine("Published");
}
break;

case "retry-forever":
{
Console.Write("Number of messages to produce: ");
Expand Down Expand Up @@ -195,7 +244,7 @@ await producers["kafka-flow-retry-simple-producer"]

default:
Console.Write(
"USE: retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver or exit: ");
"USE: retry-simple, retry-forever, retry-durable-mongodb, retry-durable-sqlserver, retry-durable-postgres or exit: ");
break;
}
}
Expand Down
Loading