diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs index ac00d8798..62bbc4f56 100644 --- a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs +++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs @@ -1,4 +1,5 @@ using System.Text; +using Dapr.Messaging.Clients.StreamingClient; using Dapr.Messaging.PublishSubscribe; using Dapr.Messaging.PublishSubscribe.Extensions; @@ -21,12 +22,12 @@ Task HandleMessageAsync(TopicMessage message, CancellationT } } -var messagingClient = app.Services.GetRequiredService(); +var messagingClient = app.Services.GetRequiredService(); //Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic", - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)), + new DaprStreamingSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)), HandleMessageAsync, cancellationTokenSource.Token); await Task.Delay(TimeSpan.FromMinutes(1)); diff --git a/src/Dapr.Common/JsonConverters/RFC3389JsonConverter.cs b/src/Dapr.Common/JsonConverters/RFC3389JsonConverter.cs new file mode 100644 index 000000000..f7599784d --- /dev/null +++ b/src/Dapr.Common/JsonConverters/RFC3389JsonConverter.cs @@ -0,0 +1,80 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Globalization; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Dapr.Common.JsonConverters; + +/// +/// Provides serialization and deserialization between a and its RFC3389 format. +/// +internal sealed class Rfc3389JsonConverter : JsonConverter +{ + private const string Rfc3389Format = "yyyy-MM-dd'T'HH:mm:ss.fffK"; + + /// Reads and converts the JSON to type. + /// The reader. + /// The type to convert. + /// An object that specifies serialization options to use. + /// The converted value. + public override DateTimeOffset? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var stringValue = reader.GetString(); + if (stringValue is null) + { + return null; + } + + if (string.IsNullOrWhiteSpace(stringValue)) + { + throw new JsonException("The data string is empty or whitespace and cannot be converted to a DateTimeOffset."); + } + + try + { + return DateTimeOffset.ParseExact(stringValue, Rfc3389Format, CultureInfo.InvariantCulture, + DateTimeStyles.AdjustToUniversal); + } + catch (FormatException ex) + { + throw new JsonException($"The date string '{stringValue}' is not in the expected RFC3389 format.", ex); + } + } + + /// Writes a specified value as JSON. + /// The writer to write to. + /// The value to convert to JSON. + /// An object that specifies serialization options to use. + public override void Write(Utf8JsonWriter writer, DateTimeOffset? value, JsonSerializerOptions options) + { + try + { + if (value is null) + { + writer.WriteNullValue(); + } + else + { + var dateString = ((DateTimeOffset)value).ToString(Rfc3389Format, CultureInfo.InvariantCulture); + var targetValue = dateString.Replace("+00:00", "Z").Trim('"'); + writer.WriteStringValue(targetValue); + } + } + catch (Exception ex) + { + throw new JsonException("An error occurred while writing the DateTimeOffset value.", ex); + } + } +} diff --git a/src/Dapr.Common/Serialization/TypeConverters.cs b/src/Dapr.Common/Serialization/TypeConverters.cs new file mode 100644 index 000000000..1a899f9ea --- /dev/null +++ b/src/Dapr.Common/Serialization/TypeConverters.cs @@ -0,0 +1,48 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json; +using Google.Protobuf; + +namespace Dapr.Common.Serialization; + +/// +/// Converters used to serialize and deserialize data. +/// +internal static class TypeConverters +{ + /// + /// Converts an arbitrary type to a -based . + /// + /// The data to convert. + /// The JSON serialization options. + /// The type of the given data. + /// The given data as a JSON-based byte string. + internal static ByteString ToJsonByteString(T data, JsonSerializerOptions options) + { + var bytes = JsonSerializer.SerializeToUtf8Bytes(data, options); + return ByteString.CopyFrom(bytes); + } + + /// + /// Deserializes a -based to an arbitrary type. + /// + /// The data to convert. + /// The JSON serialization options. + /// The type of the data to deserialize to. + /// The strongly-typed deserialized data. + internal static T? FromJsonByteString(ByteString data, JsonSerializerOptions options) where T : class + { + return data.Length == 0 ? null : JsonSerializer.Deserialize(data.Span, options); + } +} diff --git a/src/Dapr.Messaging/PublishSubscribe/IDaprPubSubBuilder.cs b/src/Dapr.Messaging/Clients/IDaprPubSubBuilder.cs similarity index 94% rename from src/Dapr.Messaging/PublishSubscribe/IDaprPubSubBuilder.cs rename to src/Dapr.Messaging/Clients/IDaprPubSubBuilder.cs index 23e6df1f6..bb6ff5df5 100644 --- a/src/Dapr.Messaging/PublishSubscribe/IDaprPubSubBuilder.cs +++ b/src/Dapr.Messaging/Clients/IDaprPubSubBuilder.cs @@ -11,7 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// Provides a Dapr client builder specific for Publish/Subscribe operations. diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/CloudEvent.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/CloudEvent.cs new file mode 100644 index 000000000..b8778e01a --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/CloudEvent.cs @@ -0,0 +1,70 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json.Serialization; +using Dapr.Common.JsonConverters; +using Dapr.Messaging.JsonConverters; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Represents a CloudEvent without data. +/// +/// The context in which an event happened, e.g. the type of an event source. +/// Describes the type of event related to the originating occurrence. +public record CloudEvent( + [property: JsonPropertyName("source")] Uri Source, + [property: JsonPropertyName("type")] string Type) +{ + /// + /// The subject of the event in the context of the event producer (identified by ). + /// + [JsonPropertyName("subject")] + public string? Subject { get; init; } + + /// + /// The version of the CloudEvents specification which the event uses. + /// + /// + /// While this SDK implements specification 1.0.2, this value only has the major and minor values included allowing + /// for "patch" changes that don't change this property's value in the serialization. + /// + [JsonPropertyName("specversion")] + public string SpecVersion => "1.0"; + + /// + /// The timestamp of when the occurrence happened. + /// + [JsonPropertyName("time")] + [JsonConverter(typeof(Rfc3389JsonConverter))] + public DateTimeOffset? Time { get; init; } = null; +} + +/// +/// Represents a CloudEvent with typed data. +/// +/// The context in which an event happened, e.g. the type of an event source. +/// Describes the type of event related to the originating occurrence. +/// Domain-specific information about the event occurrence. +[JsonConverter(typeof(CloudEventDataJsonSerializer<>))] +public record CloudEvent( + Uri Source, + string Type, + [property: JsonPropertyName("data")] TData Data) : CloudEvent(Source, Type) +{ + /// + /// Content type of the data value. + /// + [JsonPropertyName("datacontenttype")] + public string DataContentType { get; init; } = "application/json"; +} diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishRequest.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishRequest.cs new file mode 100644 index 000000000..3952d8472 --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishRequest.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.Clients.ProgrammaticClient; + +/// +/// Information about the type being published via the bulk publish operation. +/// +/// The data to serialize in the event. +/// The optional data content type. This defaults to "application/json" is not set. +/// The type to serialize. +public sealed record DaprBulkPublishRequest(TValue Payload, string DataContentType = "application/json"); diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishResponse.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishResponse.cs new file mode 100644 index 000000000..4df69c709 --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishResponse.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Messaging.PublishSubscribe; + +namespace Dapr.Messaging.Clients.ProgrammaticClient; + +/// +/// Represents the responses returned for failed bulk publishing events. +/// +/// The list of entries that failed to be published. +public record DaprBulkPublishResponse(IReadOnlyList FailedEntries); diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishResponseFailedEntry.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishResponseFailedEntry.cs new file mode 100644 index 000000000..3cbb3b32f --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprBulkPublishResponseFailedEntry.cs @@ -0,0 +1,23 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// Represents the status of each event that was published during BulkPublishRequest. +/// +/// The entry that failed to be published. +/// The error message stating why the entry failed to publish. +public record DaprBulkPublishResponseFailedEntry(BulkPublishRequestEntry Entry, string ErrorMessage); diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprPubSubProgrammaticClient.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprPubSubProgrammaticClient.cs new file mode 100644 index 000000000..e0b42d6ba --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprPubSubProgrammaticClient.cs @@ -0,0 +1,144 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json;using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.Clients.ProgrammaticClient; + +/// +/// The base implementation of a programmatic pub/sub client. +/// +public abstract class DaprPubSubProgrammaticClient(Autogenerated.Dapr.DaprClient client, HttpClient httpClient, JsonSerializerOptions jsonSerializerOptions, string? daprApiToken = null) : IDaprPubSubProgrammaticClient +{ + private bool disposed; + + /// + /// The HTTP client used by the client for calling the Dapr runtime. + /// + /// + /// Property exposed for testing purposes. + /// + internal protected readonly HttpClient HttpClient = httpClient; + + /// + /// The Dapr API token value. + /// + /// + /// Property exposed for testing purposes. + /// + internal protected readonly string? DaprApiToken = daprApiToken; + + /// + /// The autogenerated Dapr client. + /// + /// + /// Property exposed for testing purposes. + /// + internal protected readonly Autogenerated.Dapr.DaprClient Client = client; + + /// + /// The JSON serializer options. + /// + /// + /// Property exposed for testing purposes. + /// + internal protected JsonSerializerOptions JsonSerializerOptions = jsonSerializerOptions; + + /// + /// Publishes an event to the specified topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to publish to. + /// The optional data that will be JSON serialized and provided as the event payload. + /// A collection of optional metadata key/value pairs that will be provided to the component. The valid + /// metadata keys and values are determined by the type of PubSub component used. + /// Cancellation token used to cancel the operation. + /// The type of data that will be JSON serialized and provided as the event payload. + public abstract Task PublishEventAsync( + string pubSubName, + string topicName, + TData? data = null, + Dictionary? metadata = null, + CancellationToken cancellationToken = default) where TData : class; + + /// + /// Bulk-publishes multiple events to the specified topic at once. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to publish to. + /// The collection of data that will be JSON serialized and provided as the event payload. + /// A collection of optional metadata key/value pairs that will be provided to the component. The valid + /// metadata keys and values are determined by the type of PubSub component used. + /// Cancellation token used to cancel the operation. + /// The type of data that will be JSON serialized and provided as the event payload. + public abstract Task PublishEventAsync( + string pubSubName, + string topicName, + IReadOnlyList data, + Dictionary? metadata = null, + CancellationToken cancellationToken = default); + + /// + /// Publishes an event with a byte-based payload to the specified topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic to publish to. + /// The raw byte data used as the event payload. + /// The content type of the given bytes. This defaults to "application/json". + /// A collection of optional metadata key/value pairs that will be provided to the component. The valid + /// metadata keys and values are determined by the type of PubSub component used. + /// Cancellation token used to cancel the operation. + public abstract Task PublishEventAsync( + string pubSubName, + string topicName, + ReadOnlyMemory data, + string dataContentType = "application/json", + Dictionary? metadata = null, + CancellationToken cancellationToken = default); + + /// + /// // Bulk Publishes multiple events to the specified topic. + /// + /// The name of the Publish/Subscribe component. + /// The name of the topic the request should be published to. + /// The list of events to be serialized and ublished. + /// A collection of optional metadata key/value pairs that will be provided to the component. + /// The valid metadata keys and values are determined by the type of PubSub component used. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed with a list of error + /// messages accompanying any failed requests. + public abstract Task BulkPublishEventAsync( + string pubSubName, + string topicName, + IReadOnlyList events, + Dictionary? metadata = null, + CancellationToken cancellationToken = default); + + /// + public void Dispose() + { + if (!this.disposed) + { + Dispose(disposing: true); + this.disposed = true; + } + } + + /// + /// Disposes the resources associated with the object. + /// + /// true if called by a call to the Dispose method; otherwise false. + protected virtual void Dispose(bool disposing) + { + } +} diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprPubSubProgrammaticGrpcClient.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprPubSubProgrammaticGrpcClient.cs new file mode 100644 index 000000000..c88e9facd --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/DaprPubSubProgrammaticGrpcClient.cs @@ -0,0 +1,195 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json; +using Dapr.Common; +using Dapr.Common.Serialization; +using Dapr.Messaging.Clients.StreamingClient; +using Dapr.Messaging.PublishSubscribe; +using Google.Protobuf; +using Grpc.Core; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.Clients.ProgrammaticClient; + +/// +/// A client for interacting with the Dapr endpoints. +/// +internal sealed class DaprPubSubProgrammaticGrpcClient( + Autogenerated.Dapr.DaprClient client, + HttpClient httpClient, + JsonSerializerOptions jsonSerializerOptions, + string? daprApiToken = null) : DaprPubSubProgrammaticClient(client, httpClient, jsonSerializerOptions, daprApiToken) +{ + /// + public override async Task PublishEventAsync( + string pubSubName, + string topicName, + TData? data = null, + Dictionary? metadata = null, + CancellationToken cancellationToken = default) where TData : class + { + ArgumentException.ThrowIfNullOrEmpty(pubSubName, nameof(pubSubName)); + ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName)); + + var payload = data is null ? null : TypeConverters.ToJsonByteString(data, JsonSerializerOptions); + await MakePublishRequestAsync(pubSubName, topicName, payload, cancellationToken: cancellationToken); + } + + /// + public override async Task PublishEventAsync( + string pubSubName, + string topicName, + IReadOnlyList data, + Dictionary? metadata = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(pubSubName, nameof(pubSubName)); + ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName)); + + var payload = TypeConverters.ToJsonByteString(data, JsonSerializerOptions); + await MakePublishRequestAsync(pubSubName, topicName, payload, cancellationToken: cancellationToken); + } + + /// + public override async Task PublishEventAsync( + string pubSubName, + string topicName, + ReadOnlyMemory data, + string dataContentType = "application/json", + Dictionary? metadata = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(pubSubName, nameof(pubSubName)); + ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName)); + + await MakePublishRequestAsync(pubSubName, topicName, ByteString.CopyFrom(data.Span), dataContentType, metadata, + cancellationToken); + } + + /// + public override Task BulkPublishEventAsync( + string pubsubName, + string topicName, + IReadOnlyList events, + Dictionary? metadata = null, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); + ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName)); + ArgumentVerifier.ThrowIfNull(events, nameof(events)); + var pubSubEvents = events + .Select(ev => new DaprBulkPublishRequest(ev)) + .ToList(); + + return MakePublishRequestAsync(pubsubName, topicName, pubSubEvents, metadata, cancellationToken); + } + + private async Task MakePublishRequestAsync( + string pubSubName, + string topicName, + ByteString? payload = null, + string? dataContentType = null, + Dictionary? metadata = null, + CancellationToken cancellationToken = default) + { + var envelope = new Autogenerated.PublishEventRequest { PubsubName = pubSubName, Topic = topicName }; + + if (payload is not null) + { + envelope.Data = payload; + envelope.DataContentType = dataContentType; + } + + if (metadata is not null) + { + foreach (var kvp in metadata) + { + envelope.Metadata.Add(kvp.Key, kvp.Value); + } + } + + var grpcCallOptions = + DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprPubSubStreamingGrpcClient).Assembly, DaprApiToken, + cancellationToken); + + try + { + await Client.PublishEventAsync(envelope, grpcCallOptions); + } + catch (RpcException ex) + { + throw new DaprException( + "Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + private async Task MakePublishRequestAsync( + string pubSubName, + string topicName, + IReadOnlyList> events, + Dictionary? metadata = null, + CancellationToken cancellationToken = default) + { + var envelope = new Autogenerated.BulkPublishRequest { PubsubName = pubSubName, Topic = topicName }; + var entryMap = new Dictionary(); + + for (var counter = 0; counter < events.Count; counter++) + { + var entry = new Autogenerated.BulkPublishRequestEntry + { + EntryId = counter.ToString(), + Event = TypeConverters.ToJsonByteString(events[counter], JsonSerializerOptions), + ContentType = events[counter].DataContentType + }; + + //Add the metadata to each entry + if (metadata is not null) + { + foreach (var kvp in metadata) + { + entry.Metadata.Add(kvp.Key, kvp.Value); + } + } + envelope.Entries.Add(entry); + entryMap.Add(counter.ToString(), entry); + } + + //Add the metadata to the outer request as well + if (metadata is not null) + { + foreach (var kvp in metadata) + { + envelope.Metadata.Add(kvp.Key, kvp.Value); + } + } + + var grpcCallOptions = + DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprPubSubStreamingGrpcClient).Assembly, DaprApiToken, + cancellationToken); + try + { + var response = await Client.BulkPublishEventAlpha1Async(envelope, grpcCallOptions); + + var bulkPublishResponse = new DaprBulkPublishResponse(response.FailedEntries + .Select(entry => new DaprBulkPublishResponseFailedEntry(entryMap[entry.EntryId], entry.Error)) + .ToList()); + return bulkPublishResponse; + } + catch (RpcException ex) + { + throw new DaprException( + "Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } +} diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/IDaprPubSubProgrammaticBuilder.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/IDaprPubSubProgrammaticBuilder.cs new file mode 100644 index 000000000..8d55746a7 --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/IDaprPubSubProgrammaticBuilder.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Messaging.Clients.StreamingClient; + +namespace Dapr.Messaging.Clients.ProgrammaticClient; + +/// +/// Provides a Dapr client builder specific for programmatic Publish/Subscribe operations. +/// +public interface IDaprPubSubProgrammaticBuilder : IDaprPubSubBuilder; diff --git a/src/Dapr.Messaging/Clients/ProgrammaticClient/IDaprPubSubProgrammaticClient.cs b/src/Dapr.Messaging/Clients/ProgrammaticClient/IDaprPubSubProgrammaticClient.cs new file mode 100644 index 000000000..6e0cea924 --- /dev/null +++ b/src/Dapr.Messaging/Clients/ProgrammaticClient/IDaprPubSubProgrammaticClient.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common; + +namespace Dapr.Messaging.Clients.ProgrammaticClient; + +/// +/// Represents a programmatic pub/sub client. +/// +public interface IDaprPubSubProgrammaticClient : IDaprClient; diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingClient.cs similarity index 74% rename from src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs rename to src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingClient.cs index cbf25ea49..fb184c7d8 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingClient.cs @@ -11,15 +11,15 @@ // limitations under the License. // ------------------------------------------------------------------------ -using Dapr.Common; -using P = Dapr.Client.Autogen.Grpc.v1; +using System.Text.Json; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// -/// The base implementation of a Dapr pub/sub client. +/// The base implementation of a Dapr streaming pub/sub client. /// -public abstract class DaprPublishSubscribeClient(P.Dapr.DaprClient client, HttpClient httpClient, string? daprApiToken = null) : IDaprClient +public abstract class DaprPubSubStreamingClient(Autogenerated.Dapr.DaprClient client, HttpClient httpClient, JsonSerializerOptions jsonSerializerOptions, string? daprApiToken = null) : IDaprPubSubStreamingClient { private bool disposed; @@ -45,7 +45,15 @@ public abstract class DaprPublishSubscribeClient(P.Dapr.DaprClient client, HttpC /// /// Property exposed for testing purposes. /// - internal protected readonly P.Dapr.DaprClient Client = client; + internal protected readonly Autogenerated.Dapr.DaprClient Client = client; + + /// + /// The JSON serializer options. + /// + /// + /// Property exposed for testing purposes. + /// + internal protected JsonSerializerOptions JsonSerializerOptions = jsonSerializerOptions; /// /// Dynamically subscribes to a Publish/Subscribe component and topic. @@ -56,7 +64,7 @@ public abstract class DaprPublishSubscribeClient(P.Dapr.DaprClient client, HttpC /// The delegate reflecting the action to take upon messages received by the subscription. /// Cancellation token. /// - public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default); + public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprStreamingSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default); /// public void Dispose() diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs b/src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingClientBuilder.cs similarity index 73% rename from src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs rename to src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingClientBuilder.cs index 691ff9d38..482c120b9 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingClientBuilder.cs @@ -15,13 +15,13 @@ using Microsoft.Extensions.Configuration; using Autogenerated = Dapr.Client.Autogen.Grpc.v1; -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// -/// Builds a . +/// Builds a . /// /// An optional instance of . -public sealed class DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : DaprGenericClientBuilder(configuration) +public sealed class DaprPubSubStreamingClientBuilder(IConfiguration? configuration = null) : DaprGenericClientBuilder(configuration) { /// /// Builds the client instance from the properties of the builder. @@ -30,10 +30,10 @@ public sealed class DaprPublishSubscribeClientBuilder(IConfiguration? configurat /// /// Builds the client instance from the properties of the builder. /// - public override DaprPublishSubscribeClient Build() + public override DaprPubSubStreamingClient Build() { - var daprClientDependencies = BuildDaprClientDependencies(typeof(DaprPublishSubscribeClient).Assembly); + var daprClientDependencies = BuildDaprClientDependencies(typeof(DaprPubSubStreamingClient).Assembly); var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel); - return new DaprPublishSubscribeGrpcClient(client, daprClientDependencies.httpClient, daprClientDependencies.daprApiToken); + return new DaprPubSubStreamingGrpcClient(client, daprClientDependencies.httpClient, JsonSerializerOptions, daprClientDependencies.daprApiToken); } } diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingGrpcClient.cs similarity index 81% rename from src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs rename to src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingGrpcClient.cs index ace670df3..8853151c9 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/DaprPubSubStreamingGrpcClient.cs @@ -11,17 +11,19 @@ // limitations under the License. // ------------------------------------------------------------------------ -using P = Dapr.Client.Autogen.Grpc.v1.Dapr; +using System.Text.Json; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// A client for interacting with the Dapr endpoints. /// -internal sealed class DaprPublishSubscribeGrpcClient( - P.DaprClient client, +internal sealed class DaprPubSubStreamingGrpcClient( + Autogenerated.Dapr.DaprClient client, HttpClient httpClient, - string? daprApiToken = null) : DaprPublishSubscribeClient(client, httpClient, daprApiToken) + JsonSerializerOptions jsonSerializerOptions, + string? daprApiToken = null) : DaprPubSubStreamingClient(client, httpClient, jsonSerializerOptions, daprApiToken) { /// /// Dynamically subscribes to a Publish/Subscribe component and topic. @@ -35,7 +37,7 @@ internal sealed class DaprPublishSubscribeGrpcClient( public override async Task SubscribeAsync( string pubSubName, string topicName, - DaprSubscriptionOptions options, + DaprStreamingSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default) { @@ -44,6 +46,8 @@ public override async Task SubscribeAsync( return receiver; } + + /// protected override void Dispose(bool disposing) { diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs b/src/Dapr.Messaging/Clients/StreamingClient/DaprStreamingSubscriptionOptions.cs similarity index 92% rename from src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs rename to src/Dapr.Messaging/Clients/StreamingClient/DaprStreamingSubscriptionOptions.cs index 73838b605..3087cefb9 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/DaprStreamingSubscriptionOptions.cs @@ -11,13 +11,13 @@ // limitations under the License. // ------------------------------------------------------------------------ -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// Options used to configure the dynamic Dapr subscription. /// /// Describes the policy to take on messages that have not been acknowledged within the timeout period. -public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy) +public sealed record DaprStreamingSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy) { /// /// Subscription metadata. diff --git a/src/Dapr.Messaging/Clients/StreamingClient/IDaprPubSubStreamingBuilder.cs b/src/Dapr.Messaging/Clients/StreamingClient/IDaprPubSubStreamingBuilder.cs new file mode 100644 index 000000000..093a495e1 --- /dev/null +++ b/src/Dapr.Messaging/Clients/StreamingClient/IDaprPubSubStreamingBuilder.cs @@ -0,0 +1,19 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.Clients.StreamingClient; + +/// +/// Provides a Dapr client builder specific for Publish/Subscribe operations. +/// +public interface IDaprPubSubStreamingBuilder : IDaprPubSubBuilder; diff --git a/src/Dapr.Messaging/Clients/StreamingClient/IDaprPubSubStreamingClient.cs b/src/Dapr.Messaging/Clients/StreamingClient/IDaprPubSubStreamingClient.cs new file mode 100644 index 000000000..a5cb38ce6 --- /dev/null +++ b/src/Dapr.Messaging/Clients/StreamingClient/IDaprPubSubStreamingClient.cs @@ -0,0 +1,21 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Common; + +namespace Dapr.Messaging.Clients.StreamingClient; + +/// +/// Represents a PubSub streaming client. +/// +public interface IDaprPubSubStreamingClient : IDaprClient; diff --git a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs b/src/Dapr.Messaging/Clients/StreamingClient/MessageHandlingPolicy.cs similarity index 96% rename from src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs rename to src/Dapr.Messaging/Clients/StreamingClient/MessageHandlingPolicy.cs index de6882095..9d156cbd9 100644 --- a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/MessageHandlingPolicy.cs @@ -12,7 +12,7 @@ // ------------------------------------------------------------------------ -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// Defines the policy for handling streaming message subscriptions, including retry logic and timeout settings. diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/Clients/StreamingClient/PublishSubscribeReceiver.cs similarity index 98% rename from src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs rename to src/Dapr.Messaging/Clients/StreamingClient/PublishSubscribeReceiver.cs index 4b0d608ff..ed0f95029 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/PublishSubscribeReceiver.cs @@ -16,7 +16,7 @@ using Grpc.Core; using P = Dapr.Client.Autogen.Grpc.v1; -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// A thread-safe implementation of a receiver for messages from a specified Dapr publish/subscribe component and @@ -43,7 +43,7 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable /// /// Options allowing the behavior of the receiver to be configured. /// - private readonly DaprSubscriptionOptions options; + private readonly DaprStreamingSubscriptionOptions options; /// /// A channel used to decouple the messages received from the sidecar to their consumption. /// @@ -90,7 +90,7 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable /// Options allowing the behavior of the receiver to be configured. /// The delegate reflecting the action to take upon messages received by the subscription. /// A reference to the DaprClient instance. - internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler handler, P.Dapr.DaprClient client) + internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprStreamingSubscriptionOptions options, TopicMessageHandler handler, P.Dapr.DaprClient client) { this.client = client; this.pubSubName = pubSubName; diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs b/src/Dapr.Messaging/Clients/StreamingClient/TopicMessage.cs similarity index 97% rename from src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs rename to src/Dapr.Messaging/Clients/StreamingClient/TopicMessage.cs index 402a89e9f..c38ec45a4 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/TopicMessage.cs @@ -13,7 +13,7 @@ using Google.Protobuf.WellKnownTypes; -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// A message retrieved from a Dapr publish/subscribe topic. diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs b/src/Dapr.Messaging/Clients/StreamingClient/TopicMessageHandler.cs similarity index 96% rename from src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs rename to src/Dapr.Messaging/Clients/StreamingClient/TopicMessageHandler.cs index 65b7abf01..f50efb471 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/TopicMessageHandler.cs @@ -11,7 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// The handler delegate responsible for processing the topic message. diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs b/src/Dapr.Messaging/Clients/StreamingClient/TopicResponseAction.cs similarity index 96% rename from src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs rename to src/Dapr.Messaging/Clients/StreamingClient/TopicResponseAction.cs index 5a34f4cc2..d40fc801d 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs +++ b/src/Dapr.Messaging/Clients/StreamingClient/TopicResponseAction.cs @@ -11,7 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ -namespace Dapr.Messaging.PublishSubscribe; +namespace Dapr.Messaging.Clients.StreamingClient; /// /// Describes the various actions that can be taken on a topic message. diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/DaprPubSubBuilder.cs b/src/Dapr.Messaging/Extensions/DaprPubSubBuilder.cs similarity index 96% rename from src/Dapr.Messaging/PublishSubscribe/Extensions/DaprPubSubBuilder.cs rename to src/Dapr.Messaging/Extensions/DaprPubSubBuilder.cs index 2772df2fd..314b6e7ed 100644 --- a/src/Dapr.Messaging/PublishSubscribe/Extensions/DaprPubSubBuilder.cs +++ b/src/Dapr.Messaging/Extensions/DaprPubSubBuilder.cs @@ -11,6 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ +using Dapr.Messaging.Clients.StreamingClient; using Microsoft.Extensions.DependencyInjection; namespace Dapr.Messaging.PublishSubscribe.Extensions; diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs b/src/Dapr.Messaging/Extensions/PublishSubscribeServiceCollectionExtensions.cs similarity index 70% rename from src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs rename to src/Dapr.Messaging/Extensions/PublishSubscribeServiceCollectionExtensions.cs index 954940e53..ff68d3d9d 100644 --- a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs +++ b/src/Dapr.Messaging/Extensions/PublishSubscribeServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ using Dapr.Common.Extensions; +using Dapr.Messaging.Clients.StreamingClient; using Microsoft.Extensions.DependencyInjection; namespace Dapr.Messaging.PublishSubscribe.Extensions; @@ -12,13 +13,13 @@ public static class PublishSubscribeServiceCollectionExtensions /// Adds Dapr Publish/Subscribe support to the service collection. /// /// The . - /// Optionally allows greater configuration of the using injected services. + /// Optionally allows greater configuration of the using injected services. /// The lifetime of the registered services. /// public static IDaprPubSubBuilder AddDaprPubSubClient( this IServiceCollection services, - Action? configure = null, + Action? configure = null, ServiceLifetime lifetime = ServiceLifetime.Singleton) => - services.AddDaprClient( + services.AddDaprClient( configure, lifetime); } diff --git a/src/Dapr.Messaging/JsonConverters/CloudEventDataJsonSerializer.cs b/src/Dapr.Messaging/JsonConverters/CloudEventDataJsonSerializer.cs new file mode 100644 index 000000000..798b7a770 --- /dev/null +++ b/src/Dapr.Messaging/JsonConverters/CloudEventDataJsonSerializer.cs @@ -0,0 +1,70 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json; +using System.Text.Json.Serialization; +using Dapr.Common.JsonConverters; +using Dapr.Messaging.PublishSubscribe; + +namespace Dapr.Messaging.JsonConverters; + +internal sealed class CloudEventDataJsonSerializer : JsonConverter> +{ + /// Reads and converts the JSON to type . + /// The reader. + /// The type to convert. + /// An object that specifies serialization options to use. + /// The converted value. + public override CloudEvent? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + throw new NotImplementedException(); + } + + /// Writes a specified value as JSON. + /// The writer to write to. + /// The value to convert to JSON. + /// An object that specifies serialization options to use. + public override void Write(Utf8JsonWriter writer, CloudEvent value, JsonSerializerOptions options) + { + writer.WriteStartObject(); + + writer.WriteString("source", value.Source.ToString()); + writer.WriteString("type", value.Type); + writer.WriteString("specversion", value.SpecVersion); + + if (value.Subject is not null) + { + writer.WriteString("subject", value.Subject); + } + + if (value.Time is not null) + { + options.Converters.Add(new Rfc3389JsonConverter()); + var serializedTime = JsonSerializer.Serialize(value.Time, options).Trim('"'); + + writer.WriteString("time", serializedTime); + } + + if (value.DataContentType == "application/json") + { + writer.WritePropertyName("Data"); + JsonSerializer.Serialize(writer, value.Data, options); + } + else + { + writer.WriteString("Data", value.Data?.ToString()); + } + + writer.WriteEndObject(); + } +} diff --git a/test/Dapr.Common.Test/JsonConverters/Rfc3389JsonConverterTest.cs b/test/Dapr.Common.Test/JsonConverters/Rfc3389JsonConverterTest.cs new file mode 100644 index 000000000..62ad9ba3b --- /dev/null +++ b/test/Dapr.Common.Test/JsonConverters/Rfc3389JsonConverterTest.cs @@ -0,0 +1,67 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Globalization; +using System.Text; +using System.Text.Json; +using Dapr.Common.JsonConverters; +using Xunit; + +namespace Dapr.Common.Test.JsonConverters; + +public sealed class Rfc3389JsonConverterTest +{ + private readonly Rfc3389JsonConverter _converter = new(); + + [Fact] + public void Read_ShouldReturnNull_WhenStringValueIsNull() + { + const string json = "null"; + var reader = new Utf8JsonReader(Encoding.UTF8.GetBytes(json)); + reader.Read(); + + var result = _converter.Read(ref reader, typeof(DateTimeOffset?), new JsonSerializerOptions()); + + Assert.Null(result); + } + + [Fact] + public void Write_ShouldWriteNullValue_WhenValueIsNull() + { + var options = new JsonSerializerOptions(); + using var stream = new System.IO.MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + _converter.Write(writer, null, options); + writer.Flush(); + + var json = System.Text.Encoding.UTF8.GetString(stream.ToArray()); + Assert.Equal("null", json); + } + + [Fact] + public void Write_ShouldWriteStringValue_WhenValueIsNotNull() + { + var options = new JsonSerializerOptions(); + using var stream = new System.IO.MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + var dateTimeOffset = DateTimeOffset.ParseExact("2025-04-13T06:35:22.000Z", "yyyy-MM-dd'T'HH:mm:ss.fffK", CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal); + _converter.Write(writer, dateTimeOffset, options); + writer.Flush(); + + var json = Encoding.UTF8.GetString(stream.ToArray()); + Assert.Equal("\"2025-04-13T06:35:22.000Z\"", json); + } +} diff --git a/test/Dapr.Messaging.Test/Clients/ProgrammaticClient/CloudEventTests.cs b/test/Dapr.Messaging.Test/Clients/ProgrammaticClient/CloudEventTests.cs new file mode 100644 index 000000000..2a9def0ad --- /dev/null +++ b/test/Dapr.Messaging.Test/Clients/ProgrammaticClient/CloudEventTests.cs @@ -0,0 +1,83 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json; +using Dapr.Messaging.JsonConverters; +using Dapr.Messaging.PublishSubscribe; +using Shouldly; +using Uri = System.Uri; + +namespace Dapr.Messaging.Test.Clients.ProgrammaticClient; + +public class CloudEventTests +{ + [Fact] + public void CloudEvent_ShouldHaveCorrectProperties() + { + var source = new Uri("https://example.com"); + const string type = "example.type"; + const string subject = "example.subject"; + var time = DateTimeOffset.UtcNow; + + var cloudEvent = new CloudEvent(source, type) { Subject = subject, Time = time }; + + cloudEvent.Source.ShouldBe(source); + cloudEvent.Type.ShouldBe(type); + cloudEvent.Subject.ShouldBe(subject); + cloudEvent.Time.ShouldBe(time); + cloudEvent.SpecVersion.ShouldBe("1.0"); + } + + [Fact] + public void CloudEvent_ShouldSerializeCorrectly() + { + var source = new Uri("https://example.com"); + const string type = "example.type"; + var cloudEvent = new CloudEvent(source, type); + var json = JsonSerializer.Serialize(cloudEvent); + + json.ShouldContain("\"source\":\"https://example.com\""); + json.ShouldContain($"\"type\":\"{type}\""); + json.ShouldContain("\"specversion\":\"1.0\""); + } + + [Fact] + public void TypedCloudEvent_ShouldHaveCorrectProperties() + { + var source = new Uri("https://example.com"); + const string type = "example.type"; + var cloudEvent = new CloudEvent(source, type); + + var json = JsonSerializer.Serialize(cloudEvent); + json.ShouldContain("\"source\":\"https://example.com\""); + json.ShouldContain($"\"type\":\"{type}\""); + json.ShouldContain("\"specversion\":\"1.0\""); + } + + [Fact] + public void TypedCloudEvent_ShouldSerializeCorrectly() + { + var source = new Uri("https://example.com"); + const string type = "example.type"; + var data = new { Key = "value" }; + var cloudEventWithData = new CloudEvent(source, type, data); + + var options = new JsonSerializerOptions { Converters = { new CloudEventDataJsonSerializer() } }; + var json = JsonSerializer.Serialize(cloudEventWithData, options); + + json.ShouldContain("\"source\":\"https://example.com/\""); + json.ShouldContain($"\"type\":\"{type}\""); + json.ShouldContain("\"data\":{\"Key\":\"value\"}"); + json.ShouldContain("\"specversion\":\"1.0\""); + } +} diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/Clients/StreamingClient/MessageHandlingPolicyTest.cs similarity index 96% rename from test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs rename to test/Dapr.Messaging.Test/Clients/StreamingClient/MessageHandlingPolicyTest.cs index 47dc68976..8282597a9 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs +++ b/test/Dapr.Messaging.Test/Clients/StreamingClient/MessageHandlingPolicyTest.cs @@ -11,9 +11,9 @@ // limitations under the License. // ------------------------------------------------------------------------ -using Dapr.Messaging.PublishSubscribe; +using Dapr.Messaging.Clients.StreamingClient; -namespace Dapr.Messaging.Test.PublishSubscribe; +namespace Dapr.Messaging.Test.Clients.StreamingClient; public class MessageHandlingPolicyTest { @@ -64,4 +64,4 @@ public void Test_MessageHandlingPolicy_Immutability() Assert.Equal(timeoutDuration, policy1.TimeoutDuration); Assert.Equal(defaultResponseAction, policy1.DefaultResponseAction); } -} \ No newline at end of file +} diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/Clients/StreamingClient/PublishSubscribeReceiverTests.cs similarity index 92% rename from test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs rename to test/Dapr.Messaging.Test/Clients/StreamingClient/PublishSubscribeReceiverTests.cs index f8070aa66..67c0831bb 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/Clients/StreamingClient/PublishSubscribeReceiverTests.cs @@ -13,12 +13,12 @@ using System.Threading.Channels; using Dapr.AppCallback.Autogen.Grpc.v1; -using Dapr.Messaging.PublishSubscribe; +using Dapr.Messaging.Clients.StreamingClient; using Grpc.Core; using Moq; using P = Dapr.Client.Autogen.Grpc.v1; -namespace Dapr.Messaging.Test.PublishSubscribe; +namespace Dapr.Messaging.Test.Clients.StreamingClient; public class PublishSubscribeReceiverTests { @@ -28,7 +28,7 @@ public void SubscribeAsync_ShouldNotBlock() const string pubSubName = "testPubSub"; const string topicName = "testTopic"; var options = - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + new DaprStreamingSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) { MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; @@ -65,7 +65,7 @@ public void Constructor_ShouldInitializeCorrectly() const string pubSubName = "testPubSub"; const string topicName = "testTopic"; var options = - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + new DaprStreamingSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) { MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; @@ -85,7 +85,7 @@ public async Task ProcessTopicChannelMessagesAsync_ShouldProcessMessages() const string pubSubName = "testPubSub"; const string topicName = "testTopic"; var options = - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + new DaprStreamingSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) { MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; @@ -127,7 +127,7 @@ public async Task SubscribeAsync_ShouldProcessAcknowledgements() { const string pubSubName = "testPubSub"; const string topicName = "testTopic"; - var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(30), TopicResponseAction.Success)) + var options = new DaprStreamingSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(30), TopicResponseAction.Success)) { MaximumQueuedMessages = 100 // Example value, adjust as needed }; @@ -177,7 +177,7 @@ public async Task DisposeAsync_ShouldCompleteChannels() const string pubSubName = "testPubSub"; const string topicName = "testTopic"; var options = - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + new DaprStreamingSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) { MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj index 0b10230f7..724acce37 100644 --- a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -23,6 +23,7 @@ + diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs index e51aa8374..017d706fd 100644 --- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -11,6 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ +using Dapr.Messaging.Clients.StreamingClient; using Dapr.Messaging.PublishSubscribe; using Dapr.Messaging.PublishSubscribe.Extensions; using Microsoft.Extensions.Configuration; @@ -39,7 +40,7 @@ public void AddDaprMessagingClient_FromIConfiguration() var app = services.BuildServiceProvider(); - var pubSubClient = app.GetRequiredService() as DaprPublishSubscribeGrpcClient; + var pubSubClient = app.GetRequiredService() as DaprPubSubStreamingGrpcClient; Assert.NotNull(pubSubClient!); Assert.Equal(apiToken, pubSubClient.DaprApiToken); @@ -52,7 +53,7 @@ public void AddDaprPubSubClient_RegistersIHttpClientFactory() services.AddDaprPubSubClient(); var serviceProvider = services.BuildServiceProvider(); - var daprClient = serviceProvider.GetService(); + var daprClient = serviceProvider.GetService(); Assert.NotNull(daprClient); } @@ -66,12 +67,12 @@ public void AddDaprPubSubClient_CallsConfigureAction() services.AddDaprPubSubClient(Configure); var serviceProvider = services.BuildServiceProvider(); - var daprClient = serviceProvider.GetService(); + var daprClient = serviceProvider.GetService(); Assert.NotNull(daprClient); Assert.True(configureCalled); return; - void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder) + void Configure(IServiceProvider sp, DaprPubSubStreamingClientBuilder builder) { configureCalled = true; } @@ -87,7 +88,7 @@ public void AddDaprPubSubClient_RegistersServicesCorrectly() var httpClientFactory = serviceProvider.GetService(); Assert.NotNull(httpClientFactory); - var daprPubSubClient = serviceProvider.GetService(); + var daprPubSubClient = serviceProvider.GetService(); Assert.NotNull(daprPubSubClient); } @@ -99,8 +100,8 @@ public void RegisterPubsubClient_ShouldRegisterSingleton_WhenLifetimeIsSingleton services.AddDaprPubSubClient(lifetime: ServiceLifetime.Singleton); var serviceProvider = services.BuildServiceProvider(); - var daprPubSubClient1 = serviceProvider.GetService(); - var daprPubSubClient2 = serviceProvider.GetService(); + var daprPubSubClient1 = serviceProvider.GetService(); + var daprPubSubClient2 = serviceProvider.GetService(); Assert.NotNull(daprPubSubClient1); Assert.NotNull(daprPubSubClient2); @@ -117,10 +118,10 @@ public async Task RegisterPubsubClient_ShouldRegisterScoped_WhenLifetimeIsScoped var serviceProvider = services.BuildServiceProvider(); await using var scope1 = serviceProvider.CreateAsyncScope(); - var daprPubSubClient1 = scope1.ServiceProvider.GetService(); + var daprPubSubClient1 = scope1.ServiceProvider.GetService(); await using var scope2 = serviceProvider.CreateAsyncScope(); - var daprPubSubClient2 = scope2.ServiceProvider.GetService(); + var daprPubSubClient2 = scope2.ServiceProvider.GetService(); Assert.NotNull(daprPubSubClient1); Assert.NotNull(daprPubSubClient2); @@ -135,8 +136,8 @@ public void RegisterPubsubClient_ShouldRegisterTransient_WhenLifetimeIsTransient services.AddDaprPubSubClient(lifetime: ServiceLifetime.Transient); var serviceProvider = services.BuildServiceProvider(); - var daprPubSubClient1 = serviceProvider.GetService(); - var daprPubSubClient2 = serviceProvider.GetService(); + var daprPubSubClient1 = serviceProvider.GetService(); + var daprPubSubClient2 = serviceProvider.GetService(); Assert.NotNull(daprPubSubClient1); Assert.NotNull(daprPubSubClient2); diff --git a/test/Dapr.Messaging.Test/JsonConverters/CloudEventJsonDataSerializerTest.cs b/test/Dapr.Messaging.Test/JsonConverters/CloudEventJsonDataSerializerTest.cs new file mode 100644 index 000000000..b66accf11 --- /dev/null +++ b/test/Dapr.Messaging.Test/JsonConverters/CloudEventJsonDataSerializerTest.cs @@ -0,0 +1,93 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Text.Json; +using Dapr.Messaging.JsonConverters; +using Dapr.Messaging.PublishSubscribe; + +namespace Dapr.Messaging.Test.JsonConverters; + +public class CloudEventJsonDataSerializerTest +{ + private readonly CloudEventDataJsonSerializer _converter = new(); + + [Fact] + public void Write_ShouldWriteCloudEventWithData_WhenDataContentTypeIsJson() + { + var cloudEvent = new CloudEvent( + new Uri("https://example.com/source"), + "example.type", + "example data") + { + Subject = "example subject", + Time = DateTimeOffset.Parse("2025-04-13T06:35:22.000Z"), + DataContentType = "application/json" + }; + + var options = new JsonSerializerOptions(); + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + _converter.Write(writer, cloudEvent, options); + writer.Flush(); + + var json = System.Text.Encoding.UTF8.GetString(stream.ToArray()); + const string expectedJson = "{\"source\":\"https://example.com/source\",\"type\":\"example.type\",\"specversion\":\"1.0\",\"subject\":\"example subject\",\"time\":\"2025-04-13T06:35:22.000Z\",\"Data\":\"example data\"}"; + Assert.Equal(expectedJson, json); + } + + [Fact] + public void Write_ShouldWriteCloudEventWithData_WhenDataContentTypeIsNotJson() + { + var cloudEvent = new CloudEvent( + new Uri("https://example.com/source"), + "example.type", + "example data") + { + Subject = "example subject", + Time = DateTimeOffset.Parse("2025-04-13T06:35:22.000Z"), + DataContentType = "text/plain" + }; + + var options = new JsonSerializerOptions(); + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + _converter.Write(writer, cloudEvent, options); + writer.Flush(); + + var json = System.Text.Encoding.UTF8.GetString(stream.ToArray()); + const string expectedJson = "{\"source\":\"https://example.com/source\",\"type\":\"example.type\",\"specversion\":\"1.0\",\"subject\":\"example subject\",\"time\":\"2025-04-13T06:35:22.000Z\",\"Data\":\"example data\"}"; + Assert.Equal(expectedJson, json); + } + + [Fact] + public void Write_ShouldWriteCloudEventWithoutOptionalFields() + { + var cloudEvent = new CloudEvent( + new Uri("https://example.com/source"), + "example.type", + "example data"); + + var options = new JsonSerializerOptions(); + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + _converter.Write(writer, cloudEvent, options); + writer.Flush(); + + var json = System.Text.Encoding.UTF8.GetString(stream.ToArray()); + const string expectedJson = "{\"source\":\"https://example.com/source\",\"type\":\"example.type\",\"specversion\":\"1.0\",\"Data\":\"example data\"}"; + Assert.Equal(expectedJson, json); + } +}