diff --git a/WebJobs.Extensions.Redis.sln b/WebJobs.Extensions.Redis.sln index 7a3cd4f..f58c7a4 100644 --- a/WebJobs.Extensions.Redis.sln +++ b/WebJobs.Extensions.Redis.sln @@ -14,13 +14,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Functions.W EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.Redis", "src\Microsoft.Azure.WebJobs.Extensions.Redis\Microsoft.Azure.WebJobs.Extensions.Redis.csproj", "{A1A50657-EB50-4B7C-9BDD-AB2A357F8235}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReadThroughSamples", "samples\CosmosDBIntegration\ReadThroughSamples\ReadThroughSamples.csproj", "{77A0D8F6-CAB3-450B-BBCB-658C8A47EB3C}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WriteAroundSamples", "samples\CosmosDBIntegration\WriteAroundSamples\WriteAroundSamples.csproj", "{780469B0-E554-46F8-8333-4C1630C04E3D}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WriteBehindSamples", "samples\CosmosDBIntegration\WriteBehindSamples\WriteBehindSamples.csproj", "{C6273150-22D0-4967-B7B6-9A2EA674053D}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WriteThroughSamples", "samples\CosmosDBIntegration\WriteThroughSamples\WriteThroughSamples.csproj", "{61001020-ABEE-4B06-BF56-1C1989838CBA}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB", "samples\CosmosDBIntegration\ReadThroughSamples\Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.csproj", "{77A0D8F6-CAB3-450B-BBCB-658C8A47EB3C}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples", "samples\dotnet-isolated\Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.csproj", "{9EA54616-32DD-44C5-BB24-D0FEA4733850}" EndProject @@ -50,18 +44,6 @@ Global {77A0D8F6-CAB3-450B-BBCB-658C8A47EB3C}.Debug|Any CPU.Build.0 = Debug|Any CPU {77A0D8F6-CAB3-450B-BBCB-658C8A47EB3C}.Release|Any CPU.ActiveCfg = Release|Any CPU {77A0D8F6-CAB3-450B-BBCB-658C8A47EB3C}.Release|Any CPU.Build.0 = Release|Any CPU - {780469B0-E554-46F8-8333-4C1630C04E3D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {780469B0-E554-46F8-8333-4C1630C04E3D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {780469B0-E554-46F8-8333-4C1630C04E3D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {780469B0-E554-46F8-8333-4C1630C04E3D}.Release|Any CPU.Build.0 = Release|Any CPU - {C6273150-22D0-4967-B7B6-9A2EA674053D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {C6273150-22D0-4967-B7B6-9A2EA674053D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {C6273150-22D0-4967-B7B6-9A2EA674053D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {C6273150-22D0-4967-B7B6-9A2EA674053D}.Release|Any CPU.Build.0 = Release|Any CPU - {61001020-ABEE-4B06-BF56-1C1989838CBA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {61001020-ABEE-4B06-BF56-1C1989838CBA}.Debug|Any CPU.Build.0 = Debug|Any CPU - {61001020-ABEE-4B06-BF56-1C1989838CBA}.Release|Any CPU.ActiveCfg = Release|Any CPU - {61001020-ABEE-4B06-BF56-1C1989838CBA}.Release|Any CPU.Build.0 = Release|Any CPU {9EA54616-32DD-44C5-BB24-D0FEA4733850}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9EA54616-32DD-44C5-BB24-D0FEA4733850}.Debug|Any CPU.Build.0 = Debug|Any CPU {9EA54616-32DD-44C5-BB24-D0FEA4733850}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/ReadThroughSamples.csproj b/samples/CosmosDBIntegration/ReadThroughSamples/Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.csproj similarity index 81% rename from samples/CosmosDBIntegration/ReadThroughSamples/ReadThroughSamples.csproj rename to samples/CosmosDBIntegration/ReadThroughSamples/Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.csproj index 75540d6..c973ea7 100644 --- a/samples/CosmosDBIntegration/ReadThroughSamples/ReadThroughSamples.csproj +++ b/samples/CosmosDBIntegration/ReadThroughSamples/Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.csproj @@ -3,14 +3,10 @@ net6.0 v4 - - - - - + diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/Models/CosmosDBListData.cs b/samples/CosmosDBIntegration/ReadThroughSamples/Models/CosmosDBListData.cs new file mode 100644 index 0000000..3251b7d --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/Models/CosmosDBListData.cs @@ -0,0 +1,9 @@ + using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models +{ + public record CosmosDBListData( + string id, + List value + ); +} \ No newline at end of file diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/Models/PubSubData.cs b/samples/CosmosDBIntegration/ReadThroughSamples/Models/PubSubData.cs new file mode 100644 index 0000000..41bf50b --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/Models/PubSubData.cs @@ -0,0 +1,11 @@ +using System; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models +{ + public record PubSubData( + string id, + string channel, + string message, + DateTime timestamp + ); +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/Models/RedisData.cs b/samples/CosmosDBIntegration/ReadThroughSamples/Models/RedisData.cs new file mode 100644 index 0000000..89f7c27 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/Models/RedisData.cs @@ -0,0 +1,11 @@ +using System; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models +{ + public record RedisData( + string id, + string key, + string value, + DateTime timestamp + ); +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/Models/StreamData.cs b/samples/CosmosDBIntegration/ReadThroughSamples/Models/StreamData.cs new file mode 100644 index 0000000..348d5a8 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/Models/StreamData.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System.Collections.Generic; +using System.Linq; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models +{ + public class StreamData + { + public string id { get; set; } + public Dictionary values { get; set; } + + // Helper method to format stream message + public static StreamData Format(StreamEntry entry, ILogger logger) + { + logger.LogInformation("ID: {val}", entry.Id.ToString()); + + // Map each key value pair + Dictionary dict = entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()); + + StreamData sampleItem = new StreamData { id = entry.Id, values = dict }; + return sampleItem; + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/Models/StreamDataSingleDocument.cs b/samples/CosmosDBIntegration/ReadThroughSamples/Models/StreamDataSingleDocument.cs new file mode 100644 index 0000000..24a079f --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/Models/StreamDataSingleDocument.cs @@ -0,0 +1,51 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System.Collections.Generic; +using System.Linq; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models +{ + public class StreamDataSingleDocument + { + public string id { get; set; } + public int maxlen { get; set; } + public Dictionary> messages { get; set; } + + public static StreamDataSingleDocument CreateNewEntry(StreamEntry entry, string streamName, ILogger logger) + { + logger.LogInformation("Creating a new document for {val}. Inserting ID: {val} as the first entry", streamName, entry.Id.ToString()); + + // Map each key value pair + Dictionary dict = entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()); + + // Create a new list of messages + var list = new Dictionary>(); + list.Add(entry.Id.ToString(), dict); + + StreamDataSingleDocument data = new StreamDataSingleDocument { id = streamName, maxlen = 1000, messages = list }; + return data; + } + + public static StreamDataSingleDocument UpdateExistingEntry(StreamDataSingleDocument results, StreamEntry entry, ILogger logger) + { + logger.LogInformation("Adding to {val} document. Inserting ID: {val} ", results.id, entry.Id.ToString()); + + // Map each key value pair + Dictionary dict = entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()); + + // Update list of messages + var list = results.messages; + list.Add(entry.Id.ToString(), dict); + + if (list.Count > results.maxlen) + { + string minKey = list.Keys.Min(); + list.Remove(minKey); + } + + StreamDataSingleDocument data = new StreamDataSingleDocument { id = results.id, maxlen = results.maxlen, messages = list }; + return data; + } + + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/ListSample.cs b/samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/ListReadThrough.cs similarity index 90% rename from samples/CosmosDBIntegration/ReadThroughSamples/ListSample.cs rename to samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/ListReadThrough.cs index a26e9dd..c59aaa2 100644 --- a/samples/CosmosDBIntegration/ReadThroughSamples/ListSample.cs +++ b/samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/ListReadThrough.cs @@ -1,15 +1,15 @@ using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Linq; -using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.Models; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; using Microsoft.Extensions.Logging; using StackExchange.Redis; using System; using System.Linq; using System.Threading.Tasks; -namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.ReadThrough { - public static class ListSample + public static class ListReadThrough { //Redis Cache primary connection string from local.settings.json public const string RedisConnectionString = "RedisConnectionString"; @@ -22,28 +22,6 @@ public static class ListSample //Uses the key of the user's choice and should be changed accordingly public const string ListKey = "userListName"; - /// - /// Adds a CosmosDBListData item to a Redis list with a specific key. - /// - /// The response object returned by a Cosmos DB query. - /// The item to be added to the Redis cache. - /// The key for the Redis list to which the item will be added. - /// None - public static async Task ToCacheAsync(FeedResponse response, CosmosDBListData item, string listEntry) - { - //Retrieve the values in cosmos associated with the list name, so you can access each item - var fullEntry = response.Take(response.Count); - - if (fullEntry == null) return; - - //Accessing each value from the entry - foreach (CosmosDBListData inputValues in fullEntry) - { - RedisValue[] redisValues = Array.ConvertAll(inputValues.value.ToArray(), item => (RedisValue)item); - await s_redisDb.Value.ListRightPushAsync(listEntry, redisValues); - } - } - /// /// Function that retrieves a list from CosmosDB based on a Redis key miss event, and stores it in Redis cache using read-through caching. /// The function takes a RedisPubSubTrigger attribute, which listens for key miss events on the Redis cache @@ -52,8 +30,8 @@ public static async Task ToCacheAsync(FeedResponse response, C /// A Cosmos DB client object used to connect to the database. /// An ILogger object used for logging purposes. /// - [FunctionName(nameof(ListTriggerReadThroughFunc))] - public static async Task ListTriggerReadThroughFunc( + [FunctionName(nameof(ListReadThrough))] + public static async Task Run( [RedisPubSubTrigger(RedisConnectionString, "__keyevent@0__:keymiss")] string listEntry, [CosmosDB( Connection = "CosmosDBConnectionString" )]CosmosClient client, ILogger logger) @@ -89,5 +67,27 @@ public static async Task ListTriggerReadThroughFunc( await ToCacheAsync(response, item, listEntry); } } + + /// + /// Adds a CosmosDBListData item to a Redis list with a specific key. + /// + /// The response object returned by a Cosmos DB query. + /// The item to be added to the Redis cache. + /// The key for the Redis list to which the item will be added. + /// None + private static async Task ToCacheAsync(FeedResponse response, CosmosDBListData item, string listEntry) + { + //Retrieve the values in cosmos associated with the list name, so you can access each item + var fullEntry = response.Take(response.Count); + + if (fullEntry == null) return; + + //Accessing each value from the entry + foreach (CosmosDBListData inputValues in fullEntry) + { + RedisValue[] redisValues = Array.ConvertAll(inputValues.value.ToArray(), item => (RedisValue)item); + await s_redisDb.Value.ListRightPushAsync(listEntry, redisValues); + } + } } } \ No newline at end of file diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/StreamSample.cs b/samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/StreamSingleDocumentReadThrough.cs similarity index 88% rename from samples/CosmosDBIntegration/ReadThroughSamples/StreamSample.cs rename to samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/StreamSingleDocumentReadThrough.cs index 0ddb0bd..b994bf7 100644 --- a/samples/CosmosDBIntegration/ReadThroughSamples/StreamSample.cs +++ b/samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/StreamSingleDocumentReadThrough.cs @@ -1,79 +1,79 @@ -using Microsoft.Azure.Cosmos; -using Microsoft.Azure.Cosmos.Linq; -using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.Models; -using Microsoft.Extensions.Logging; -using StackExchange.Redis; -using System; -using System.Linq; -using System.Threading.Tasks; - -namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples -{ - internal class StreamSample - { - // Redis connection string and stream names stored in local.settings.json - public const string RedisConnectionSetting = "RedisConnectionString"; - private static readonly Lazy _redisDB = new Lazy(() => - ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase()); - public static string StreamNameSingleDocument = Environment.GetEnvironmentVariable("StreamTest"); - - // CosmosDB connection string, database name and container name stored in local.settings.json - public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; - public const string DatabaseSetting = "%CosmosDbDatabaseId%"; - public const string ContainerSettingSingleDocument = "%StreamCosmosDbContainerIdSingleDocument%"; - - /// - /// Read Through: If the stream does not exist in Redis, refresh it with the values saved in CosmosDB - /// - /// The message which has gone through the stream. Includes message id alongside the key/value pairs - /// Container for where the CosmosDB items are stored - /// ILogger used to write key information - /// - [FunctionName(nameof(ReadThroughForStreamSingleDocumentAsync))] - public static async Task ReadThroughForStreamSingleDocumentAsync( - [RedisPubSubTrigger(RedisConnectionSetting, "__keyevent@0__:keymiss")] string entry, - [CosmosDB( - databaseName: DatabaseSetting, - containerName: ContainerSettingSingleDocument, - Connection = CosmosDbConnectionSetting)] CosmosClient cosmosDbClient, - ILogger logger) - { - - if (entry != StreamNameSingleDocument) return; - - // Connect CosmosDB container - Container cosmosDbContainer = cosmosDbClient.GetContainer(Environment.GetEnvironmentVariable(DatabaseSetting.Replace("%", "")), Environment.GetEnvironmentVariable(ContainerSettingSingleDocument.Replace("%", ""))); - - // Query Database by the stream name - FeedIterator query = cosmosDbContainer.GetItemLinqQueryable(true) - .Where(b => b.id == StreamNameSingleDocument) - .ToFeedIterator(); - FeedResponse response = await query.ReadNextAsync(); - StreamDataSingleDocument results = response.FirstOrDefault(defaultValue: null); - - // If stream not found - if (results == null) - { - logger.LogWarning("{streamNameSingleDocument} was not found in the database, failed to read stream", StreamNameSingleDocument); - } - else - { - logger.LogInformation("{streamNameSingleDocument} was found in the database", StreamNameSingleDocument); - - // Go through each message and format the key/value pairs - foreach (var message in results.messages) - { - var values = new NameValueEntry[message.Value.Count]; - int i = 0; - foreach (var pair in message.Value) - { - values[i++] = new NameValueEntry(pair.Key, pair.Value); - } - - // Upload value to Redis Stream - await _redisDB.Value.StreamAddAsync(StreamNameSingleDocument, values, messageId: message.Key, maxLength: results.maxlen); - } - } - } - } -} +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.Cosmos.Linq; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Linq; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.ReadThrough +{ + internal class StreamSingleDocumentReadThrough + { + // Redis connection string and stream names stored in local.settings.json + public const string RedisConnectionSetting = "RedisConnectionString"; + private static readonly Lazy _redisDB = new Lazy(() => + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase()); + public static string StreamNameSingleDocument = Environment.GetEnvironmentVariable("StreamTest"); + + // CosmosDB connection string, database name and container name stored in local.settings.json + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSettingSingleDocument = "%StreamCosmosDbContainerIdSingleDocument%"; + + /// + /// Read Through: If the stream does not exist in Redis, refresh it with the values saved in CosmosDB + /// + /// The message which has gone through the stream. Includes message id alongside the key/value pairs + /// Container for where the CosmosDB items are stored + /// ILogger used to write key information + /// + [FunctionName(nameof(StreamSingleDocumentReadThrough))] + public static async Task Run( + [RedisPubSubTrigger(RedisConnectionSetting, "__keyevent@0__:keymiss")] string entry, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: ContainerSettingSingleDocument, + Connection = CosmosDbConnectionSetting)] CosmosClient cosmosDbClient, + ILogger logger) + { + + if (entry != StreamNameSingleDocument) return; + + // Connect CosmosDB container + Container cosmosDbContainer = cosmosDbClient.GetContainer(Environment.GetEnvironmentVariable(DatabaseSetting.Replace("%", "")), Environment.GetEnvironmentVariable(ContainerSettingSingleDocument.Replace("%", ""))); + + // Query Database by the stream name + FeedIterator query = cosmosDbContainer.GetItemLinqQueryable(true) + .Where(b => b.id == StreamNameSingleDocument) + .ToFeedIterator(); + FeedResponse response = await query.ReadNextAsync(); + StreamDataSingleDocument results = response.FirstOrDefault(defaultValue: null); + + // If stream not found + if (results == null) + { + logger.LogWarning("{streamNameSingleDocument} was not found in the database, failed to read stream", StreamNameSingleDocument); + } + else + { + logger.LogInformation("{streamNameSingleDocument} was found in the database", StreamNameSingleDocument); + + // Go through each message and format the key/value pairs + foreach (var message in results.messages) + { + var values = new NameValueEntry[message.Value.Count]; + int i = 0; + foreach (var pair in message.Value) + { + values[i++] = new NameValueEntry(pair.Key, pair.Value); + } + + // Upload value to Redis Stream + await _redisDB.Value.StreamAddAsync(StreamNameSingleDocument, values, messageId: message.Key, maxLength: results.maxlen); + } + } + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/PubSubSample.cs b/samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/StringReadThrough.cs similarity index 87% rename from samples/CosmosDBIntegration/ReadThroughSamples/PubSubSample.cs rename to samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/StringReadThrough.cs index 427ec9c..36445b7 100644 --- a/samples/CosmosDBIntegration/ReadThroughSamples/PubSubSample.cs +++ b/samples/CosmosDBIntegration/ReadThroughSamples/ReadThrough/StringReadThrough.cs @@ -1,15 +1,15 @@ using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Linq; -using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.Models; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; using Microsoft.Extensions.Logging; using StackExchange.Redis; using System; using System.Linq; using System.Threading.Tasks; -namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.ReadThrough { - public static class PubSubSample + public static class StringReadThrough { //Connection string settings that will be resolved from local.settings.json file public const string RedisConnectionSetting = "RedisConnectionString"; @@ -32,8 +32,8 @@ public static class PubSubSample /// An ILogger that is used to write informational log messages. /// /// Thrown when the requested key is not found in Redis or Cosmos DB - [FunctionName(nameof(PubsubReadThroughAsync))] - public static async Task PubsubReadThroughAsync( + [FunctionName(nameof(StringReadThrough))] + public static async Task Run( [RedisPubSubTrigger(RedisConnectionSetting, "__keyevent@0__:keymiss")] string missedKey, [CosmosDB( databaseName: DatabaseSetting, @@ -41,7 +41,7 @@ public static async Task PubsubReadThroughAsync( Connection = CosmosDbConnectionSetting)]CosmosClient cosmosDB, ILogger logger) { - if (missedKey == StreamSample.StreamNameSingleDocument || missedKey == ListSample.ListKey) return; + if (missedKey == StreamSingleDocumentReadThrough.StreamNameSingleDocument || missedKey == ListReadThrough.ListKey) return; //get the Cosmos DB database and the container to read from Container cosmosDBContainer = cosmosDB.GetContainer(Environment.GetEnvironmentVariable(DatabaseSetting.Replace("%", "")), Environment.GetEnvironmentVariable(ContainerSetting.Replace("%", ""))); @@ -50,7 +50,7 @@ public static async Task PubsubReadThroughAsync( //get all entries in the container that contain the missed key using FeedIterator feed = queryable .Where(p => p.key == missedKey) - .OrderByDescending(p => p.timestamp) + .OrderByDescending((RedisData p) => p.timestamp) .ToFeedIterator(); FeedResponse response = await feed.ReadNextAsync(); diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/ListWriteAround.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/ListWriteAround.cs new file mode 100644 index 0000000..ce38dbd --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/ListWriteAround.cs @@ -0,0 +1,55 @@ +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteAround +{ + public static class ListWriteAround + { + //Redis Cache primary connection string from local.settings.json + public const string RedisConnectionString = "RedisConnectionString"; + private static readonly IDatabase s_redisDb = ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionString)).GetDatabase(); + + //CosmosDB Endpoint from local.settings.json + public const string CosmosDBConnectionString = "CosmosDBConnectionString"; + + //Uses the key of the user's choice and should be changed accordingly + public const string ListKey = "userListName"; + + /// + /// This function is triggered by changes to a specified CosmosDB container. It retrieves a list of items that have been modified or added + /// to the container and adds them to a Redis cache. The function converts each item's collection of values into an array and pushes the array to the Redis cache. + /// + /// An IReadOnlyList of ListData objects representing the items that have been modified or added to the CosmosDB container. + /// An ILogger object used for logging purposes. + [FunctionName(nameof(ListWriteAround))] + public static void Run([CosmosDBTrigger( + databaseName: "%CosmosDbDatabaseId%", + containerName: "%ListCosmosDbContainerId%", + Connection = "CosmosDBConnectionString", + LeaseContainerName = "leases")]IReadOnlyList readOnlyList, ILogger log) + { + if (readOnlyList == null || readOnlyList.Count <= 0) return; + + //Accessing each entry from readOnlyList + foreach (CosmosDBListData inputValues in readOnlyList) + { + if (inputValues.id == ListKey) + { + //Converting one entry into an array format + RedisValue[] redisValues = Array.ConvertAll(inputValues.value.ToArray(), item => (RedisValue)item); + s_redisDb.ListRightPush(ListKey, redisValues); + + //Optional foreach loop + log to confirm each value is sent to the cache + foreach (RedisValue entryValue in redisValues) + { + log.LogInformation("Saved item " + entryValue + " in Azure Redis cache"); + + } + } + } + } + } +} \ No newline at end of file diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/PubSubWriteAround.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/PubSubWriteAround.cs new file mode 100644 index 0000000..005c645 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/PubSubWriteAround.cs @@ -0,0 +1,52 @@ +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteAround +{ + public static class PubSubWriteAround + { + //Connection string settings that will be resolved from local.settings.json file + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + + //Cosmos DB settings that will be resolved from local.settings.json file + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%PubSubCosmosDbContainerId%"; + public const string PubSubContainerSetting = "%MessagesCosmosDbContainerId%"; + + private static readonly Lazy s_redisConnection = new Lazy(() => + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting))); + + /// + /// Triggers when pubsub messages are written directly to Cosmos DB, then asynchronously publishes the associated message to the Redis channel that was specified. + /// + /// A readonly list containing all the new/updated documents in the specified Cosmos DB container. + /// An ILogger that is used to write informational log messages. + /// + [FunctionName(nameof(PubSubWriteAround))] + public static async Task Run( + [CosmosDBTrigger( + databaseName: DatabaseSetting, + containerName: PubSubContainerSetting, + Connection = CosmosDbConnectionSetting, + LeaseContainerName = "leases", LeaseContainerPrefix = "Write-Around-")]IReadOnlyList cosmosData, + ILogger logger) + { + //if the list is null or empty, return + if (cosmosData == null || cosmosData.Count <= 0) return; + + ISubscriber redisPublisher = s_redisConnection.Value.GetSubscriber(); + //for each new item upladed to cosmos, publish to Redis + foreach (var document in cosmosData) + { + //publish the message to the correct Redis channel + await redisPublisher.PublishAsync(document.channel, document.message); + logger.LogInformation($"Message: \"{document.message}\" has been published on channel: \"{document.channel}\"."); + } + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StreamSingleDocumentWriteAround.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StreamSingleDocumentWriteAround.cs new file mode 100644 index 0000000..841ed9a --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StreamSingleDocumentWriteAround.cs @@ -0,0 +1,62 @@ +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteAround +{ + internal class StreamSingleDocumentWriteAround + { + // Redis database and stream stored in local.settings.json + public const string RedisConnectionSetting = "RedisConnectionString"; + private static readonly Lazy _redisDB = new Lazy(() => + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase()); + public static string StreamName = Environment.GetEnvironmentVariable("StreamTest"); + + // CosmosDB connection string, database name and container name stored in local.settings.json + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%StreamCosmosDbContainerId%"; + public const string ContainerSettingSingleDocument = "%StreamCosmosDbContainerIdSingleDocument%"; + public static string StreamNameSingleDocument = Environment.GetEnvironmentVariable("StreamTestSingleDocument"); + + /// + /// Write Around (Single Document): Write from Cosmos DB to Redis whenever a change occurs in one of the CosmosDB documents + /// + /// List of changed documents in CosmosDB + /// ILogger used to write key information + [FunctionName(nameof(StreamSingleDocumentWriteAround))] + public static async Task Run( + [CosmosDBTrigger( + databaseName: DatabaseSetting, + containerName: ContainerSettingSingleDocument, + Connection = CosmosDbConnectionSetting, + LeaseContainerName = "leases", + CreateLeaseContainerIfNotExists = true)]IReadOnlyList input, ILogger logger) + { + if (input == null) return; + + // Iterate through each changed document + foreach (var document in input) + { + logger.LogInformation("{stream1} changed and sent to {stream2} ", document.id, StreamNameSingleDocument); + + // Go through each message and format the key/value pairs + foreach (var message in document.messages) + { + var values = new NameValueEntry[message.Value.Count]; + int i = 0; + foreach (var entry in message.Value) + { + values[i++] = new NameValueEntry(entry.Key, entry.Value); + } + + // Upload value to Redis Stream + await _redisDB.Value.StreamAddAsync(StreamNameSingleDocument, values, messageId: message.Key, maxLength: document.maxlen); + } + } + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StreamWriteAround.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StreamWriteAround.cs new file mode 100644 index 0000000..db1a1dc --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StreamWriteAround.cs @@ -0,0 +1,58 @@ +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteAround +{ + internal class StreamWriteAround + { + // Redis database and stream stored in local.settings.json + public const string RedisConnectionSetting = "RedisConnectionString"; + private static readonly Lazy _redisDB = new Lazy(() => + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase()); + public static string StreamName = Environment.GetEnvironmentVariable("StreamTest"); + + // CosmosDB connection string, database name and container name stored in local.settings.json + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%StreamCosmosDbContainerId%"; + + /// + /// Write Around: Write from Cosmos DB to Redis whenever a change occurs in one of the CosmosDB documents + /// + /// List of changed documents in CosmosDB + /// ILogger used to write key information + [FunctionName(nameof(StreamWriteAround))] + public static async Task Run( + [CosmosDBTrigger( + databaseName: DatabaseSetting, + containerName: ContainerSetting, + Connection = CosmosDbConnectionSetting, + LeaseContainerName = "leases", + CreateLeaseContainerIfNotExists = true)]IReadOnlyList input, ILogger logger) + { + if (input == null) return; + + // Iterate through each changed document + foreach (var document in input) + { + logger.LogInformation("{messageID} changed and sent to {stream} ", document.id, StreamName); + + var values = new NameValueEntry[document.values.Count]; + int i = 0; + + // Format the key/value pairs + foreach (KeyValuePair entry in document.values) + { + values[i++] = new NameValueEntry(entry.Key, entry.Value); + } + + // Upload value to Redis Stream + await _redisDB.Value.StreamAddAsync(StreamName, values); + } + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StringWriteAround.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StringWriteAround.cs new file mode 100644 index 0000000..16a8575 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteAround/StringWriteAround.cs @@ -0,0 +1,59 @@ +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteAround +{ + public static class StringWriteAround + { + //Connection string settings that will be resolved from local.settings.json file + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + + //Cosmos DB settings that will be resolved from local.settings.json file + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%PubSubCosmosDbContainerId%"; + public const string PubSubContainerSetting = "%MessagesCosmosDbContainerId%"; + + private static readonly Lazy s_redisConnection = new Lazy(() => + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting))); + + + /// + /// Triggers when key/value pairs are written directly to Cosmos DB then asynchronously writes the associated key/value pair to Redis. + /// + /// An IReadonlyList containing all the new/updated documents in the specified Cosmos DB container. + /// An ILogger that is used to write informational log messages. + /// + /// Thrown when they key/value pair is already stored in the Redis cache. + [FunctionName(nameof(StringWriteAround))] + public static async Task Run( + [CosmosDBTrigger( + databaseName: DatabaseSetting, + containerName: ContainerSetting, + Connection = CosmosDbConnectionSetting, + LeaseContainerName = "leases", LeaseContainerPrefix = "Write-Around-")]IReadOnlyList cosmosData, + ILogger logger) + { + //if the list is null or empty, return + if (cosmosData == null || cosmosData.Count <= 0) return; + + IDatabaseAsync redisDb = s_redisConnection.Value.GetDatabase(); + //for each item upladed to cosmos, write it to Redis + foreach (var document in cosmosData) + { + //if the key/value pair is already in Redis, throw an exception + if (await redisDb.StringGetAsync(document.key) == document.value) + { + throw new Exception($"ERROR: Key: \"{document.key}\", Value: \"{document.value}\" pair is already in Azure Redis Cache."); + } + //Write the key/value pair to Redis + await redisDb.StringSetAsync(document.key, document.value); + logger.LogInformation($"Key: \"{document.key}\", Value: \"{document.value}\" added to Redis."); + } + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteBehind/ListTriggerWriteBehind.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteBehind/ListTriggerWriteBehind.cs new file mode 100644 index 0000000..f9eaed1 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteBehind/ListTriggerWriteBehind.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Azure.Cosmos.Linq; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteBehind +{ + public static class ListTriggerWriteBehind + { + //Redis Cache primary connection string from local.settings.json + public const string RedisConnectionString = "RedisConnectionString"; + + //CosmosDB endpoint from local.settings.json + public const string CosmosDBConnectionString = "CosmosDBConnectionString"; + + //CosmosDB database name and container name from local.settings.json + public const string CosmosDbDatabaseId = "CosmosDbDatabaseId"; + public const string CosmosDbContainerId = "ListCosmosDbContainerId"; + + //Uses the key of the user's choice and should be changed accordingly + public const string ListKey = "userListName"; + + /// + /// This function retrieves a specified item from a CosmosDB container and adds a new entry to it. The entry is retrieved from a Redis list trigger and added to the specified item's collection of values. + /// + /// A string representing the value to be added to the CosmosDB item's collection. + /// >A Cosmos DB client object used to connect to the database. + /// An ILogger object used for logging purposes. + /// + [FunctionName(nameof(ListTriggerWriteBehind))] + public static async Task Run( + [RedisListTrigger(RedisConnectionString, ListKey)] string listEntry, [CosmosDB( + Connection = "CosmosDBConnectionString")]CosmosClient client, + ILogger logger) + { + // Retrieve the database and container from the given client, which accesses the CosmosDB Endpoint + Container db = client.GetDatabase(Environment.GetEnvironmentVariable(CosmosDbDatabaseId)).GetContainer(Environment.GetEnvironmentVariable(CosmosDbContainerId)); + + //Creates query for item in the container and + //uses feed iterator to keep track of token when receiving results from query + IOrderedQueryable query = db.GetItemLinqQueryable(); + using FeedIterator results = query + .Where(p => p.id == ListKey) + .ToFeedIterator(); + + //Retrieve collection of items from results and then the first element of the sequence + FeedResponse response = await results.ReadNextAsync(); + CosmosDBListData item = response.FirstOrDefault(defaultValue: null); + + //Optional logger to display what is being pushed to CosmosDB + logger.LogInformation("The value added to " + ListKey + " is " + listEntry + ". The value will be added to CosmosDB database: " + CosmosDbDatabaseId + " and container: " + CosmosDbContainerId + "."); + + //Create an entry if the key doesn't exist in CosmosDB or add to it if there is an existing entry + List resultsHolder = item?.value ?? new List(); + + resultsHolder.Add(listEntry); + CosmosDBListData newEntry = new CosmosDBListData(id: ListKey, value: resultsHolder); + await db.UpsertItemAsync(newEntry); + } + + } +} \ No newline at end of file diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteBehind/PubSubTriggerWriteBehind.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteBehind/PubSubTriggerWriteBehind.cs new file mode 100644 index 0000000..8c5b700 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteBehind/PubSubTriggerWriteBehind.cs @@ -0,0 +1,83 @@ +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteBehind +{ + public static class PubsubTriggerWriteBehind + { + //Connection string settings that will be resolved from local.settings.json file + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + + //Cosmos DB settings that will be resolved from local.settings.json file + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%PubSubCosmosDbContainerId%"; + public const string PubSubContainerSetting = "%MessagesCosmosDbContainerId%"; + public const string PubSubChannelSetting = "%PubSubChannel%"; + + private static readonly IDatabaseAsync s_redisDb = + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase(); + + + /// + /// Triggers when Redis keys are added or updated and asynchronously writes the key/value pair to Cosmos DB. + /// + /// The key that has been added or changed in Redis. + /// An IAsyncCollector that is used to write RedisData to Cosmos DB. + /// An ILogger that is used to write informational log messages. + /// + [FunctionName(nameof(PubsubTriggerWriteBehind))] + public static async Task Run( + [RedisPubSubTrigger(RedisConnectionSetting, "__keyevent@0__:set")] string newKey, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: ContainerSetting, + Connection = CosmosDbConnectionSetting)]IAsyncCollector cosmosDBOut, + ILogger logger) + { + //load data from Redis into a record + RedisData redisData = new RedisData( + id: Guid.NewGuid().ToString(), + key: newKey, + value: await s_redisDb.StringGetAsync(newKey), + timestamp: DateTime.UtcNow + ); + + //write the RedisData object to Cosmos DB using the IAsyncCollector + await cosmosDBOut.AddAsync(redisData); + logger.LogInformation($"Key: \"{newKey}\", Value: \"{redisData.value}\" added to Cosmos DB container: \"{Environment.GetEnvironmentVariable(ContainerSetting.Replace("%", ""))}\" at id: \"{redisData.id}\""); + } + + /// + /// Listens for messages sent on a Redis pub/sub channel and asynchronously writes the message and channel to Cosmos DB. + /// + /// The message that was published to Redis. + /// An IAsyncCollector that is used to write the PubSubData to Cosmos DB. + /// An ILogger that is used to write informational log messages. + /// + [FunctionName(nameof(PubsubWriteBehindMessageAsync))] + public static async Task PubsubWriteBehindMessageAsync( + [RedisPubSubTrigger(RedisConnectionSetting, PubSubChannelSetting)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: PubSubContainerSetting, + Connection = CosmosDbConnectionSetting)]IAsyncCollector cosmosDBOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + PubSubData redisData = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB using the IAsyncCollector + await cosmosDBOut.AddAsync(redisData); + logger.LogInformation($"Message: \"{redisData.message}\" from Channel: \"{redisData.channel}\" stored in Cosmos DB container: \"{Environment.GetEnvironmentVariable(PubSubContainerSetting.Replace("%", ""))}\" with id: \"{redisData.id}\""); + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/PubSubTriggerMessageWriteThrough.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/PubSubTriggerMessageWriteThrough.cs new file mode 100644 index 0000000..1e95903 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/PubSubTriggerMessageWriteThrough.cs @@ -0,0 +1,49 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteThrough +{ + public static class PubSubTriggerMessageWriteThrough + { + //Connection string settings that will be resolved from local.settings.json file + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + + //Cosmos DB settings that will be resolved from local.settings.json file + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%PubSubCosmosDbContainerId%"; + public const string PubSubContainerSetting = "%MessagesCosmosDbContainerId%"; + public const string PubSubChannelSetting = "%PubSubChannel%"; + + private static readonly IDatabase s_redisDb = + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase(); + + /// + /// Triggers when Redis pubsub messages are sent and synchronously writes the message and channel to Cosmos DB. + /// + /// The message that was published in Redis + /// A dynamic object that is used to synchronously write new data to CosmosDB. + /// An ILogger that is used to write informational log messages. + [FunctionName(nameof(PubSubTriggerMessageWriteThrough))] + public static void Run( + [RedisPubSubTrigger(RedisConnectionSetting, PubSubChannelSetting)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: PubSubContainerSetting, + Connection = CosmosDbConnectionSetting)]out dynamic cosmosDBOut, + ILogger logger) + { + //assign the message from Redis to a dynamic object that will be written to Cosmos DB + cosmosDBOut = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + logger.LogInformation($"Message: \"{cosmosDBOut.message}\" from Channel: \"{cosmosDBOut.channel}\" stored in Cosmos DB container: \"{Environment.GetEnvironmentVariable(PubSubContainerSetting.Replace("%", ""))}\" with id: \"{cosmosDBOut.id}\""); + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/PubSubTriggerWriteThrough.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/PubSubTriggerWriteThrough.cs new file mode 100644 index 0000000..b892f81 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/PubSubTriggerWriteThrough.cs @@ -0,0 +1,50 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteThrough +{ + public static class PubSubTriggerWriteThrough + { + //Connection string settings that will be resolved from local.settings.json file + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + + //Cosmos DB settings that will be resolved from local.settings.json file + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%PubSubCosmosDbContainerId%"; + public const string PubSubContainerSetting = "%MessagesCosmosDbContainerId%"; + public const string PubSubChannelSetting = "%PubSubChannel%"; + + private static readonly IDatabase s_redisDb = + ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(RedisConnectionSetting)).GetDatabase(); + + + /// + /// Triggers when Redis keys are added or updated and synchronously writes the key/value pair to Cosmos DB. + /// + /// The key that has been added or changed in Redis. + /// A dynamic object that is used to synchronously write new data to CosmosDB. + /// An ILogger that is used to write informational log messages. + [FunctionName(nameof(PubSubTriggerWriteThrough))] + public static void Run( + [RedisPubSubTrigger(RedisConnectionSetting, "__keyevent@0__:set")] string newKey, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: ContainerSetting, + Connection = CosmosDbConnectionSetting)]out dynamic cosmosDBOut, + ILogger logger) + { + //assign the data from Redis to a dynamic object that will be written to Cosmos DB + cosmosDBOut = new RedisData( + id: Guid.NewGuid().ToString(), + key: newKey, + value: s_redisDb.StringGet(newKey), + timestamp: DateTime.UtcNow + ); + + logger.LogInformation($"Key: \"{newKey}\", Value: \"{cosmosDBOut.value}\" addedd to Cosmos DB container: \"{Environment.GetEnvironmentVariable(ContainerSetting.Replace("%",""))}\" at id: \"{cosmosDBOut.id}\""); + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/StreamTriggerSingleDocumentWriteThrough.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/StreamTriggerSingleDocumentWriteThrough.cs new file mode 100644 index 0000000..8894516 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/StreamTriggerSingleDocumentWriteThrough.cs @@ -0,0 +1,64 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Linq; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteThrough +{ + internal class StreamTriggerSingleDocumentWriteThrough + { + // Redis connection string and stream names stored in local.settings.json + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string StreamName = "%StreamTest%"; + + // CosmosDB connection string, client, database name and container name stored in local.settings.json + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + private static CosmosClient _cosmosDbClient = new( + connectionString: Environment.GetEnvironmentVariable(CosmosDbConnectionSetting)!); + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%StreamCosmosDbContainerId%"; + public const string ContainerSettingSingleDocument = "%StreamCosmosDbContainerIdSingleDocument%"; + + /// + /// Write through (Single Document): Write messages to a single document in CosmosDB synchronously whenever a new value is added to the Redis Stream. + /// + /// The message which has gone through the stream. Includes message id alongside the key/value pairs + /// Container for where the CosmosDB items are stored + /// ILogger used to write key information + [FunctionName(nameof(StreamTriggerSingleDocumentWriteThrough))] + public static void Run( + [RedisStreamTrigger(RedisConnectionSetting, StreamName)] StreamEntry entry, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: ContainerSettingSingleDocument, + Connection = CosmosDbConnectionSetting)] ICollector items, + ILogger logger) + { + string stream = Environment.GetEnvironmentVariable(StreamName.Replace("%", "")); + + // Connect CosmosDB container + Container cosmosDbContainer = _cosmosDbClient.GetContainer(Environment.GetEnvironmentVariable(DatabaseSetting.Replace("%", "")), Environment.GetEnvironmentVariable(ContainerSettingSingleDocument.Replace("%", ""))); + + // Query CosmosDB database by the stream name + StreamDataSingleDocument results = cosmosDbContainer.GetItemLinqQueryable(true) + .Where(b => b.id == stream) + .AsEnumerable() + .FirstOrDefault(); + + if (results == null) + { + // If the stream does not exist in CosmosDB, create a new entry and insert into CosmosDB synchronously + StreamDataSingleDocument data = StreamDataSingleDocument.CreateNewEntry(entry, stream, logger); + items.Add(data); + } + else + { + // If the stream exists in CosmosDB, add to the existing entry and insert into CosmosDB synchronously + StreamDataSingleDocument data = StreamDataSingleDocument.UpdateExistingEntry(results, entry, logger); + items.Add(data); + } + } + } +} diff --git a/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/StreamTriggerWriteThrough.cs b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/StreamTriggerWriteThrough.cs new file mode 100644 index 0000000..f28a616 --- /dev/null +++ b/samples/CosmosDBIntegration/ReadThroughSamples/WriteThrough/StreamTriggerWriteThrough.cs @@ -0,0 +1,41 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.Models; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB.WriteThrough +{ + internal class StreamTriggerWriteThrough + { + // Redis connection string and stream names stored in local.settings.json + public const string RedisConnectionSetting = "RedisConnectionString"; + public const string StreamName = "%StreamTest%"; + + // CosmosDB connection string, client, database name and container name stored in local.settings.json + public const string CosmosDbConnectionSetting = "CosmosDbConnectionString"; + private static CosmosClient _cosmosDbClient = new( + connectionString: Environment.GetEnvironmentVariable(CosmosDbConnectionSetting)!); + public const string DatabaseSetting = "%CosmosDbDatabaseId%"; + public const string ContainerSetting = "%StreamCosmosDbContainerId%"; + + /// + /// Write through: Write messages to CosmosDB synchronously whenever a new value is added to the Redis Stream. Each message will get it's own document. + /// + /// The message which has gone through the stream. Includes message id alongside the key/value pairs + /// Container for where the CosmosDB items are stored + /// ILogger used to write key information + [FunctionName(nameof(StreamTriggerWriteThrough))] + public static void Run( + [RedisStreamTrigger(RedisConnectionSetting, StreamName)] StreamEntry entry, + [CosmosDB( + databaseName: DatabaseSetting, + containerName: ContainerSetting, + Connection = CosmosDbConnectionSetting)] ICollector items, + ILogger logger) + { + // Insert data into CosmosDB synchronously + items.Add(StreamData.Format(entry, logger)); + } + } +} diff --git a/samples/CosmosDBIntegration/Models/RedisData.cs b/samples/CosmosDBIntegration/RedisData.cs similarity index 84% rename from samples/CosmosDBIntegration/Models/RedisData.cs rename to samples/CosmosDBIntegration/RedisData.cs index f404231..3ffc8b1 100644 --- a/samples/CosmosDBIntegration/Models/RedisData.cs +++ b/samples/CosmosDBIntegration/RedisData.cs @@ -4,7 +4,7 @@ using System.Collections.Generic; using System.Linq; -namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.Models +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Samples.CosmosDB { public class StreamData { @@ -17,7 +17,7 @@ public static StreamData Format(StreamEntry entry, ILogger logger) logger.LogInformation("ID: {val}", entry.Id.ToString()); // Map each key value pair - Dictionary dict = RedisUtilities.StreamEntryToDictionary(entry); + Dictionary dict = entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()); StreamData sampleItem = new StreamData { id = entry.Id, values = dict }; return sampleItem; @@ -35,7 +35,7 @@ public static StreamDataSingleDocument CreateNewEntry(StreamEntry entry, string logger.LogInformation("Creating a new document for {val}. Inserting ID: {val} as the first entry", streamName, entry.Id.ToString()); // Map each key value pair - Dictionary dict = RedisUtilities.StreamEntryToDictionary(entry); + Dictionary dict = entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()); // Create a new list of messages var list = new Dictionary>(); @@ -50,7 +50,7 @@ public static StreamDataSingleDocument UpdateExistingEntry(StreamDataSingleDocum logger.LogInformation("Adding to {val} document. Inserting ID: {val} ", results.id, entry.Id.ToString()); // Map each key value pair - Dictionary dict = RedisUtilities.StreamEntryToDictionary(entry); + Dictionary dict = entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()); // Update list of messages var list = results.messages; diff --git a/samples/CosmosDBIntegration/WriteAroundSamples/WriteAroundSamples.csproj b/samples/CosmosDBIntegration/WriteAroundSamples/WriteAroundSamples.csproj index 6159f50..e7d85aa 100644 --- a/samples/CosmosDBIntegration/WriteAroundSamples/WriteAroundSamples.csproj +++ b/samples/CosmosDBIntegration/WriteAroundSamples/WriteAroundSamples.csproj @@ -10,7 +10,7 @@ - + diff --git a/samples/CosmosDBIntegration/WriteBehindSamples/WriteBehindSamples.csproj b/samples/CosmosDBIntegration/WriteBehindSamples/WriteBehindSamples.csproj index 074311a..6e457a0 100644 --- a/samples/CosmosDBIntegration/WriteBehindSamples/WriteBehindSamples.csproj +++ b/samples/CosmosDBIntegration/WriteBehindSamples/WriteBehindSamples.csproj @@ -10,7 +10,7 @@ - + diff --git a/samples/CosmosDBIntegration/WriteThroughSamples/WriteThroughSamples.csproj b/samples/CosmosDBIntegration/WriteThroughSamples/WriteThroughSamples.csproj index b554137..85e38a1 100644 --- a/samples/CosmosDBIntegration/WriteThroughSamples/WriteThroughSamples.csproj +++ b/samples/CosmosDBIntegration/WriteThroughSamples/WriteThroughSamples.csproj @@ -10,7 +10,7 @@ - +