Skip to content

Commit c22b9ff

Browse files
authored
Nsb10 samples sql sagafinder (#7601)
Update to nsb10
1 parent 24190e4 commit c22b9ff

23 files changed

+543
-2
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
## Prerequisites
2+
3+
### MS SQL Server
4+
5+
include: sql-prereq
6+
7+
### MySQL
8+
9+
Ensure an instance of MySQL (Version 5.7 or above) is installed and accessible on `localhost` and port `3306`. A Docker image can be used to accomplish this by running `docker run --name mysql -e 'MYSQL_ROOT_PASSWORD=yourStrong(!)Password' -e 'MYSQL_DATABASE=sqlpersistencesample' -p 3306:3306 -d mysql:latest` in a terminal.
10+
11+
Alternatively, change the connection string to point to different MySQL instance.
12+
13+
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
14+
15+
16+
### PostgreSQL
17+
18+
Ensure an instance of PostgreSQL (Version 10 or later) is installed and accessible on `localhost` and port `5432`. A Docker image can be used to accomplish this by running `docker run --name postgres -e 'POSTGRES_PASSWORD=yourStrong(!)Password' -e 'POSTGRES_DB=NsbSamplesSqlSagaFinder' -p 5432:5432 -d postgres:latest` in a terminal.
19+
20+
Alternatively, change the connection string to point to different PostgreSQL instance.
21+
22+
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\Shared\Shared.csproj" />
9+
<PackageReference Include="MySql.Data" Version="9.*" />
10+
</ItemGroup>
11+
</Project>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using NServiceBus.Extensibility;
5+
using NServiceBus.Persistence;
6+
using NServiceBus.Sagas;
7+
8+
#region MySqlFinder
9+
class OrderSagaFinder :
10+
ISagaFinder<OrderSagaData, CompletePaymentTransaction>
11+
{
12+
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
13+
{
14+
return storageSession.GetSagaData<OrderSagaData>(
15+
context: context,
16+
whereClause: "JSON_EXTRACT(Data,'$.PaymentTransactionId') = @propertyValue",
17+
appendParameters: (builder, append) =>
18+
{
19+
var parameter = builder();
20+
parameter.ParameterName = "propertyValue";
21+
parameter.Value = message.PaymentTransactionId;
22+
append(parameter);
23+
});
24+
}
25+
}
26+
#endregion
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Hosting;
4+
using MySql.Data.MySqlClient;
5+
using NServiceBus;
6+
7+
8+
Console.Title = "MySql";
9+
var builder = Host.CreateApplicationBuilder(args);
10+
11+
var endpointConfiguration = new EndpointConfiguration("Samples.SqlSagaFinder.MySql");
12+
endpointConfiguration.UseTransport(new LearningTransport());
13+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
14+
endpointConfiguration.SendFailedMessagesTo("error");
15+
endpointConfiguration.EnableInstallers();
16+
#region MySqlConfig
17+
18+
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
19+
var password = Environment.GetEnvironmentVariable("MySqlPassword");
20+
if (string.IsNullOrWhiteSpace(password))
21+
{
22+
throw new Exception("Could not extract 'MySqlPassword' from Environment variables.");
23+
}
24+
var username = Environment.GetEnvironmentVariable("MySqlUserName");
25+
if (string.IsNullOrWhiteSpace(username))
26+
{
27+
throw new Exception("Could not extract 'MySqlUserName' from Environment variables.");
28+
}
29+
var connection = $"server=localhost;user={username};database=sqlpersistencesample;port=3306;password={password};AllowUserVariables=True;AutoEnlist=false";
30+
persistence.SqlDialect<SqlDialect.MySql>();
31+
persistence.ConnectionBuilder(
32+
connectionBuilder: () =>
33+
{
34+
return new MySqlConnection(connection);
35+
});
36+
var subscriptions = persistence.SubscriptionSettings();
37+
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
38+
39+
#endregion
40+
41+
builder.UseNServiceBus(endpointConfiguration);
42+
43+
var host = builder.Build();
44+
45+
await host.StartAsync();
46+
47+
var messageSession = host.Services.GetRequiredService<IMessageSession>();
48+
49+
Console.WriteLine("Press 'enter' to send a message");
50+
while (true)
51+
{
52+
var key = Console.ReadKey();
53+
Console.WriteLine();
54+
55+
if (key.Key != ConsoleKey.Enter)
56+
{
57+
break;
58+
}
59+
60+
var startOrder = new StartOrder
61+
{
62+
OrderId = "123"
63+
};
64+
await messageSession.SendLocal(startOrder);
65+
66+
Console.WriteLine($"StartOrder sent: {startOrder.OrderId}");
67+
}
68+
69+
await host.StopAsync();
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<ProjectReference Include="..\Shared\Shared.csproj" />
9+
<PackageReference Include="Npgsql" Version="9.*" />
10+
</ItemGroup>
11+
</Project>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using NServiceBus.Extensibility;
5+
using NServiceBus.Persistence;
6+
using NServiceBus.Sagas;
7+
8+
#region PostgreSqlFinder
9+
class OrderSagaFinder :
10+
ISagaFinder<OrderSagaData, CompletePaymentTransaction>
11+
{
12+
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
13+
{
14+
return storageSession.GetSagaData<OrderSagaData>(
15+
context: context,
16+
whereClause: @"""Data""->>'PaymentTransactionId' = @propertyValue",
17+
appendParameters: (builder, append) =>
18+
{
19+
var parameter = builder();
20+
parameter.ParameterName = "propertyValue";
21+
parameter.Value = message.PaymentTransactionId;
22+
append(parameter);
23+
});
24+
}
25+
}
26+
#endregion
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.Hosting;
4+
using Npgsql;
5+
using NpgsqlTypes;
6+
using NServiceBus;
7+
8+
9+
Console.Title = "PostgreSql";
10+
var builder = Host.CreateApplicationBuilder(args);
11+
var endpointConfiguration = new EndpointConfiguration("Samples.SqlSagaFinder.PostgreSql");
12+
endpointConfiguration.UseTransport(new LearningTransport());
13+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
14+
endpointConfiguration.SendFailedMessagesTo("error");
15+
endpointConfiguration.EnableInstallers();
16+
#region PostgreSqlConfig
17+
18+
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
19+
var password = Environment.GetEnvironmentVariable("PostgreSqlPassword");
20+
if (string.IsNullOrWhiteSpace(password))
21+
{
22+
throw new Exception("Could not extract 'PostgreSqlPassword' from Environment variables.");
23+
}
24+
var username = Environment.GetEnvironmentVariable("PostgreSqlUserName");
25+
if (string.IsNullOrWhiteSpace(username))
26+
{
27+
throw new Exception("Could not extract 'PostgreSqlUserName' from Environment variables.");
28+
}
29+
var connection = $"Host=localhost;Username={username};Password={password};Database=NsbSamplesSqlSagaFinder";
30+
persistence.TablePrefix("Finder");
31+
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
32+
dialect.JsonBParameterModifier(
33+
modifier: parameter =>
34+
{
35+
var npgsqlParameter = (NpgsqlParameter)parameter;
36+
npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
37+
});
38+
persistence.ConnectionBuilder(
39+
connectionBuilder: () =>
40+
{
41+
return new NpgsqlConnection(connection);
42+
});
43+
44+
var subscriptions = persistence.SubscriptionSettings();
45+
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
46+
47+
#endregion
48+
49+
builder.UseNServiceBus(endpointConfiguration);
50+
51+
52+
var host = builder.Build();
53+
54+
await host.StartAsync();
55+
56+
var messageSession = host.Services.GetRequiredService<IMessageSession>();
57+
58+
Console.WriteLine("Press 'enter' to send a message");
59+
while (true)
60+
{
61+
var key = Console.ReadKey();
62+
Console.WriteLine();
63+
64+
if (key.Key != ConsoleKey.Enter)
65+
{
66+
break;
67+
}
68+
69+
var startOrder = new StartOrder
70+
{
71+
OrderId = "123"
72+
};
73+
await messageSession.SendLocal(startOrder);
74+
75+
Console.WriteLine($"StartOrder sent: {startOrder.OrderId}");
76+
}
77+
78+
await host.StopAsync();
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net10.0</TargetFramework>
4+
<OutputType>Exe</OutputType>
5+
<LangVersion>preview</LangVersion>
6+
</PropertyGroup>
7+
<ItemGroup>
8+
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="4.0.0-alpha.1" />
9+
</ItemGroup>
10+
<ItemGroup>
11+
<ProjectReference Include="..\Shared\Shared.csproj" />
12+
</ItemGroup>
13+
</Project>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using NServiceBus.Extensibility;
5+
using NServiceBus.Persistence;
6+
using NServiceBus.Sagas;
7+
8+
#region sqlServerFinder
9+
10+
class OrderSagaFinder :
11+
ISagaFinder<OrderSagaData, CompletePaymentTransaction>
12+
{
13+
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
14+
{
15+
return storageSession.GetSagaData<OrderSagaData>(
16+
context: context,
17+
whereClause: "JSON_VALUE(Data,'$.PaymentTransactionId') = @propertyValue",
18+
appendParameters: (builder, append) =>
19+
{
20+
var parameter = builder();
21+
parameter.ParameterName = "propertyValue";
22+
parameter.Value = message.PaymentTransactionId;
23+
append(parameter);
24+
});
25+
}
26+
}
27+
28+
#endregion
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System;
2+
using Microsoft.Data.SqlClient;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Microsoft.Extensions.Hosting;
5+
using NServiceBus;
6+
7+
8+
Console.Title = "SqlServer";
9+
var builder = Host.CreateApplicationBuilder(args);
10+
11+
var endpointConfiguration = new EndpointConfiguration("Samples.SqlSagaFinder.SqlServer");
12+
endpointConfiguration.UseTransport(new LearningTransport());
13+
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
14+
endpointConfiguration.EnableInstallers();
15+
16+
#region sqlServerConfig
17+
18+
//for local instance or SqlExpress
19+
//var connectionString = @"Data Source=(localdb)\mssqllocaldb;Database=NsbSamplesSqlSagaFinder;Trusted_Connection=True;MultipleActiveResultSets=true";
20+
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlSagaFinder;User Id=SA;Password=yourStrong(!)Password;Encrypt=false";
21+
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
22+
persistence.SqlDialect<SqlDialect.MsSqlServer>();
23+
persistence.ConnectionBuilder(
24+
connectionBuilder: () =>
25+
{
26+
return new SqlConnection(connectionString);
27+
});
28+
var subscriptions = persistence.SubscriptionSettings();
29+
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
30+
31+
#endregion
32+
33+
await SqlHelper.EnsureDatabaseExists(connectionString);
34+
35+
builder.UseNServiceBus(endpointConfiguration);
36+
37+
var host = builder.Build();
38+
39+
await host.StartAsync();
40+
41+
var messageSession = host.Services.GetRequiredService<IMessageSession>();
42+
43+
Console.WriteLine("Press 'enter' to send a message");
44+
while (true)
45+
{
46+
var key = Console.ReadKey();
47+
Console.WriteLine();
48+
49+
if (key.Key != ConsoleKey.Enter)
50+
{
51+
break;
52+
}
53+
54+
var startOrder = new StartOrder
55+
{
56+
OrderId = "123"
57+
};
58+
await messageSession.SendLocal(startOrder);
59+
60+
Console.WriteLine($"StartOrder sent: {startOrder.OrderId}");
61+
}
62+
63+
await host.StopAsync();
64+

0 commit comments

Comments
 (0)