Skip to content

Commit c697584

Browse files
dev: ydb topic example & chore changes writer (#282)
1 parent aed3ca9 commit c697584

File tree

9 files changed

+159
-24
lines changed

9 files changed

+159
-24
lines changed

.github/workflows/tests.yml

+6-2
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ jobs:
156156
strategy:
157157
fail-fast: false
158158
matrix:
159-
ydb-version: [ 'latest', '24.1' ]
159+
ydb-version: [ 'latest', 'trunk' ]
160160
dotnet-version: [ 6.0.x, 7.0.x ]
161161
include:
162162
- dotnet-version: 6.0.x
@@ -181,7 +181,7 @@ jobs:
181181
uses: actions/setup-dotnet@v4
182182
with:
183183
dotnet-version: ${{ matrix.dotnet-version }}
184-
- name: Run ADO.NET examples
184+
- name: Run ADO.NET example
185185
run: |
186186
docker cp ydb-local:/ydb_certs/ca.pem ~/
187187
cd ./examples/src/AdoNet
@@ -190,3 +190,7 @@ jobs:
190190
run: |
191191
cd ./examples/src/DapperExample
192192
dotnet run
193+
- name: YDB Topic example
194+
run: |
195+
cd ./examples/src/Topic
196+
dotnet run

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
- Fixed Writer: possible creation of a session after `DisposeAsync()`, which this could happen when there are canceled tasks in `InFlightMessages`.
2+
- Dev: `Writer.MoveNext()` changed exception on cancelToken from `WriterException` to `TaskCanceledException`.
3+
- Dev: changed log level from `Warning` to `Information` in `(Reader / Writer).Initialize()` when it is disposed.
4+
15
## v0.15.0
26
- Dev: added `ValueTask<string?> GetAuthInfoAsync()` in ICredentialProvider.
37
- Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete.

examples/src/Topic/Program.cs

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using Microsoft.Extensions.Logging;
2+
using Ydb.Sdk;
3+
using Ydb.Sdk.Services.Topic;
4+
using Ydb.Sdk.Services.Topic.Reader;
5+
using Ydb.Sdk.Services.Topic.Writer;
6+
7+
const int countMessages = 100;
8+
const string topicName = "topic_name";
9+
10+
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information));
11+
12+
var logger = loggerFactory.CreateLogger<Program>();
13+
14+
var config = new DriverConfig(
15+
endpoint: "grpc://localhost:2136",
16+
database: "/local"
17+
);
18+
19+
await using var driver = await Driver.CreateInitialized(
20+
config: config,
21+
loggerFactory: loggerFactory
22+
);
23+
24+
var topicClient = new TopicClient(driver);
25+
26+
await topicClient.CreateTopic(new CreateTopicSettings
27+
{
28+
Path = topicName,
29+
Consumers = { new Consumer("Consumer_Example") }
30+
});
31+
32+
var readerCts = new CancellationTokenSource();
33+
34+
var writerJob = Task.Run(async () =>
35+
{
36+
// ReSharper disable once AccessToDisposedClosure
37+
await using var writer = new WriterBuilder<string>(driver, topicName)
38+
{
39+
ProducerId = "ProducerId_Example"
40+
}.Build();
41+
42+
for (var i = 0; i < countMessages; i++)
43+
{
44+
await writer.WriteAsync($"Message num {i}: Hello Example YDB Topics!");
45+
}
46+
47+
readerCts.CancelAfter(TimeSpan.FromSeconds(3));
48+
});
49+
50+
var readerJob = Task.Run(async () =>
51+
{
52+
// ReSharper disable once AccessToDisposedClosure
53+
await using var reader = new ReaderBuilder<string>(driver)
54+
{
55+
ConsumerName = "Consumer_Example",
56+
SubscribeSettings = { new SubscribeSettings(topicName) }
57+
}.Build();
58+
59+
try
60+
{
61+
while (!readerCts.IsCancellationRequested)
62+
{
63+
var message = await reader.ReadAsync(readerCts.Token);
64+
65+
logger.LogInformation("Received message: [{MessageData}]", message.Data);
66+
67+
try
68+
{
69+
await message.CommitAsync();
70+
}
71+
catch (Exception e)
72+
{
73+
logger.LogError(e, "Failed commit message");
74+
}
75+
}
76+
}
77+
catch (OperationCanceledException)
78+
{
79+
}
80+
});
81+
82+
await writerJob;
83+
await readerJob;
84+
await topicClient.DropTopic(new DropTopicSettings { Path = topicName });

examples/src/Topic/Topic.csproj

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
12+
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
13+
</ItemGroup>
14+
<ItemGroup>
15+
<ProjectReference Include="..\..\..\src\Ydb.Sdk\src\Ydb.Sdk.csproj"/>
16+
</ItemGroup>
17+
</Project>

examples/src/YdbExamples.sln

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DapperExample", "DapperExam
1515
EndProject
1616
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "YC", "YC\YC.csproj", "{753E4F33-CB08-47B9-864F-4CC037B278C4}"
1717
EndProject
18+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Topic", "Topic\Topic.csproj", "{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}"
19+
EndProject
1820
Global
1921
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2022
Debug|Any CPU = Debug|Any CPU
@@ -45,6 +47,10 @@ Global
4547
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
4648
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
4749
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.Build.0 = Release|Any CPU
50+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
51+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.Build.0 = Debug|Any CPU
52+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.ActiveCfg = Release|Any CPU
53+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.Build.0 = Release|Any CPU
4854
EndGlobalSection
4955
GlobalSection(SolutionProperties) = preSolution
5056
HideSolutionNode = FALSE

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private async Task Initialize()
9696
{
9797
if (_disposeCts.IsCancellationRequested)
9898
{
99-
_logger.LogWarning("Reader writer is canceled because it has been disposed");
99+
_logger.LogDebug("Initialize Reader[{ReaderConfig}] is stopped because it has been disposed", _config);
100100

101101
return;
102102
}
@@ -335,6 +335,8 @@ private async Task RunProcessingStreamResponse()
335335
throw new ArgumentOutOfRangeException();
336336
}
337337
}
338+
339+
Logger.LogInformation("ReaderSession[{SessionId}]: ResponseStream is closed", SessionId);
338340
}
339341
catch (Driver.TransportException e)
340342
{
@@ -579,6 +581,8 @@ public override async ValueTask DisposeAsync()
579581
{
580582
await _runProcessingStreamRequest;
581583
await Stream.RequestStreamComplete();
584+
Logger.LogInformation("ReaderSession[{SessionId}]: RequestStream is closed", SessionId);
585+
582586
await _runProcessingStreamResponse; // waiting all ack's commits
583587

584588
_lifecycleReaderSessionCts.Cancel();

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ protected async void ReconnectSession()
3838
return;
3939
}
4040

41-
Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
41+
Logger.LogDebug("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
4242

4343
await _initialize();
4444
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

+12-8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ internal class Writer<TValue> : IWriter<TValue>
3131
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
3232
private volatile IWriteSession _session = null!;
3333
private volatile int _limitBufferMaxSize;
34+
private volatile bool _isStopped;
3435

3536
internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> serializer)
3637
{
@@ -52,9 +53,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
5253
{
5354
TaskCompletionSource<WriteResult> tcs = new();
5455
await using var registrationUserCancellationTokenRegistration = cancellationToken.Register(
55-
() => tcs.TrySetException(
56-
new WriterException("The write operation was canceled before it could be completed")
57-
), useSynchronizationContext: false
56+
() => tcs.TrySetCanceled(), useSynchronizationContext: false
5857
);
5958
await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register(
6059
() => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")),
@@ -194,9 +193,9 @@ private async Task Initialize()
194193

195194
try
196195
{
197-
if (_disposeCts.IsCancellationRequested && _inFlightMessages.IsEmpty)
196+
if (_isStopped)
198197
{
199-
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
198+
_logger.LogDebug("Initialize Writer[{WriterConfig}] is stopped because it has been disposed", _config);
200199

201200
return;
202201
}
@@ -313,7 +312,6 @@ private async Task Initialize()
313312
}
314313

315314
_session = newSession;
316-
newSession.RunProcessingWriteAck();
317315
WakeUpWorker(); // attempt send buffer
318316
}
319317
finally
@@ -367,6 +365,8 @@ public async ValueTask DisposeAsync()
367365
}
368366
}
369367

368+
_isStopped = true;
369+
370370
await _session.DisposeAsync();
371371

372372
_logger.LogInformation("Writer[{WriterConfig}] is disposed", _config);
@@ -443,6 +443,7 @@ internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer
443443
{
444444
private readonly WriterConfig _config;
445445
private readonly ConcurrentQueue<MessageSending> _inFlightMessages;
446+
private readonly Task _processingResponseStream;
446447

447448
private long _seqNum;
448449

@@ -466,6 +467,8 @@ ConcurrentQueue<MessageSending> inFlightMessages
466467
_config = config;
467468
_inFlightMessages = inFlightMessages;
468469
Volatile.Write(ref _seqNum, lastSeqNo); // happens-before for Volatile.Read
470+
471+
_processingResponseStream = RunProcessingWriteAck();
469472
}
470473

471474
public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
@@ -513,7 +516,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
513516
}
514517
}
515518

516-
internal async void RunProcessingWriteAck()
519+
private async Task RunProcessingWriteAck()
517520
{
518521
try
519522
{
@@ -573,7 +576,7 @@ Completing task on exception...
573576
}
574577
}
575578

576-
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
579+
Logger.LogInformation("WriterSession[{SessionId}]: ResponseStream is closed", SessionId);
577580
}
578581
catch (Driver.TransportException e)
579582
{
@@ -601,6 +604,7 @@ public override async ValueTask DisposeAsync()
601604
Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId);
602605

603606
await Stream.RequestStreamComplete();
607+
await _processingResponseStream;
604608

605609
Stream.Dispose();
606610
}

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

+24-12
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
358358
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult()
359359
{
360360
var moveTcs = new TaskCompletionSource<bool>();
361+
var moveTcsRetry = new TaskCompletionSource<bool>();
361362

362363
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
363364
.Returns(Task.CompletedTask)
@@ -367,12 +368,17 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
367368
return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled));
368369
})
369370
.Returns(Task.CompletedTask)
370-
.Returns(Task.CompletedTask);
371+
.Returns(() =>
372+
{
373+
moveTcsRetry.SetResult(true);
374+
375+
return Task.CompletedTask;
376+
});
371377
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
372378
.ReturnsAsync(true)
373379
.Returns(new ValueTask<bool>(moveTcs.Task))
374380
.ReturnsAsync(true)
375-
.ReturnsAsync(true)
381+
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
376382
.Returns(_lastMoveNext);
377383
_mockStream.SetupSequence(stream => stream.Current)
378384
.Returns(new StreamWriteMessage.Types.FromServer
@@ -569,7 +575,6 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon
569575
public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException()
570576
{
571577
var cancellationTokenSource = new CancellationTokenSource();
572-
var nextCompleted = new TaskCompletionSource<bool>();
573578
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
574579
.Returns(Task.CompletedTask);
575580
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
@@ -582,10 +587,8 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce
582587

583588
var task = writer.WriteAsync(123L, cancellationTokenSource.Token);
584589
cancellationTokenSource.Cancel();
585-
nextCompleted.SetResult(true);
586590

587-
Assert.Equal("The write operation was canceled before it could be completed",
588-
(await Assert.ThrowsAsync<WriterException>(() => task)).Message);
591+
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
589592
}
590593

591594
[Fact]
@@ -638,6 +641,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
638641
var writeTcs2 = new TaskCompletionSource();
639642
var writeTcs3 = new TaskCompletionSource();
640643
var moveTcs = new TaskCompletionSource<bool>();
644+
var moveTcsRetry = new TaskCompletionSource<bool>();
641645

642646
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
643647
.Returns(Task.CompletedTask)
@@ -657,13 +661,17 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
657661
return Task.CompletedTask;
658662
})
659663
.Returns(Task.CompletedTask)
660-
.Returns(Task.CompletedTask);
664+
.Returns(() =>
665+
{
666+
moveTcsRetry.SetResult(true);
667+
return Task.CompletedTask;
668+
});
661669

662670
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
663671
.ReturnsAsync(true)
664672
.Returns(new ValueTask<bool>(moveTcs.Task))
665673
.ReturnsAsync(true)
666-
.ReturnsAsync(true)
674+
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
667675
.Returns(_lastMoveNext);
668676
_mockStream.SetupSequence(stream => stream.Current)
669677
.Returns(new StreamWriteMessage.Types.FromServer
@@ -713,8 +721,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
713721

714722
moveTcs.SetResult(false); // Fail write ack stream => start reconnect
715723

716-
Assert.Equal("The write operation was canceled before it could be completed",
717-
(await Assert.ThrowsAsync<WriterException>(() => runTaskWithCancel)).Message);
724+
await Assert.ThrowsAsync<TaskCanceledException>(() => runTaskWithCancel);
718725
Assert.Equal(PersistenceStatus.AlreadyWritten, (await runTask1).Status);
719726
Assert.Equal(PersistenceStatus.Written, (await runTask2).Status);
720727

@@ -863,6 +870,7 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
863870
{
864871
var tcsDetectedWrite = new TaskCompletionSource();
865872
var writeTcs1 = new TaskCompletionSource<bool>();
873+
var moveTcsRetry = new TaskCompletionSource<bool>();
866874

867875
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
868876
.Returns(Task.CompletedTask)
@@ -872,12 +880,16 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
872880
return Task.CompletedTask;
873881
})
874882
.Returns(Task.CompletedTask)
875-
.Returns(Task.CompletedTask);
883+
.Returns(() =>
884+
{
885+
moveTcsRetry.SetResult(true);
886+
return Task.CompletedTask;
887+
});
876888
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
877889
.ReturnsAsync(true)
878890
.Returns(new ValueTask<bool>(writeTcs1.Task))
879891
.ReturnsAsync(true)
880-
.ReturnsAsync(true)
892+
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
881893
.Returns(_lastMoveNext);
882894

883895
_mockStream.SetupSequence(stream => stream.Current)

0 commit comments

Comments
 (0)