diff --git a/test/dotnet/Integration/CustomType.cs b/test/dotnet/Integration/CustomType.cs index 97365e4..f23e06f 100644 --- a/test/dotnet/Integration/CustomType.cs +++ b/test/dotnet/Integration/CustomType.cs @@ -1,3 +1,5 @@ +using System; + namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration { public class CustomType @@ -6,4 +8,18 @@ public class CustomType public string Field { get; set; } public string Random { get; set; } } + + public record RedisData( + string id, + string key, + string value, + DateTime timestamp + ); + + public record PubSubData( + string id, + string channel, + string message, + DateTime timestamp + ); } diff --git a/test/dotnet/Integration/IntegrationTestHelpers.cs b/test/dotnet/Integration/IntegrationTestHelpers.cs index 54d85ac..d5ab3a1 100644 --- a/test/dotnet/Integration/IntegrationTestHelpers.cs +++ b/test/dotnet/Integration/IntegrationTestHelpers.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Configuration; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using System; using System.Collections.Generic; @@ -104,5 +105,17 @@ internal static string GetLogValue(object value) { return value.GetType().FullName + ":" + JsonConvert.SerializeObject(value); } + + internal static void ClearDataFromCosmosDb(string databaseName, string containerName) + { + using CosmosClient client = new(RedisUtilities.ResolveConnectionString(localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting)); + var container = client.GetContainer(databaseName, containerName); + var items = container.GetItemLinqQueryable(allowSynchronousQueryExecution: true); + + foreach (var item in items) + { + container.DeleteItemAsync(item.id, PartitionKey.None).Wait(); + } + } } } diff --git a/test/dotnet/Integration/PubSubCosmosIntegrationTestFunctions.cs b/test/dotnet/Integration/PubSubCosmosIntegrationTestFunctions.cs new file mode 100644 index 0000000..5121e4d --- /dev/null +++ b/test/dotnet/Integration/PubSubCosmosIntegrationTestFunctions.cs @@ -0,0 +1,296 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.Cosmos.Linq; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration +{ + internal class PubSubCosmosIntegrationTestFunctions + { + //resolved from local.settings.json + public const string localhostSetting = "RedisConnectionString"; + public const string cosmosDbConnectionSetting = "CosmosDBConnectionString"; + public const string cosmosDbDatabaseSetting = "CosmosDbDatabaseId"; + public const string cosmosDbContainerSetting = "CosmosDbContainerId"; + public const string pubSubContainerSetting = "PubSubContainerId"; + + public const string pubsubChannel = "testChannel"; + public const string pubsubMultiple = "testChannel*"; + public const string keyspaceChannel = "__keyspace@0__:testKey"; + public const string keyspaceMultiple = "__keyspace@0__:testKey*"; + public const string keyeventChannelSet = "__keyevent@0__:set"; + public const string keyeventChannelAll = "__keyevent@0__:*"; + public const string keyspaceChannelAll = "__keyspace@0__:*"; + public const string allChannels = "*"; + + + private static readonly IDatabase s_redisDb = + ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, localhostSetting)).GetDatabase(); + + private static readonly Lazy s_readRedisDb = new Lazy(() => + ConnectionMultiplexer.ConnectAsync(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, localhostSetting)).Result.GetDatabase()); + + // PubSubWrite -- Tests + [FunctionName(nameof(SingleChannelWriteBehind))] + public static async Task SingleChannelWriteBehind( + [RedisPubSubTrigger(localhostSetting, pubsubChannel)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting)]IAsyncCollector cosmosOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + PubSubData redisData = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB + await cosmosOut.AddAsync(redisData); + logger.LogInformation($"Message: \"{redisData.message}\" from Channel: \"{redisData.channel}\" stored in Cosmos DB container: \"{"PSContainerId"}\" with id: \"{redisData.id}\""); + } + + [FunctionName(nameof(MultipleChannelWriteBehind))] + public static async Task MultipleChannelWriteBehind( + [RedisPubSubTrigger(localhostSetting, pubsubMultiple)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting)]IAsyncCollector cosmosOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + PubSubData redisData = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB + await cosmosOut.AddAsync(redisData); + logger.LogInformation($"Message: \"{redisData.message}\" from Channel: \"{redisData.channel}\" stored in Cosmos DB container: \"{"PSContainerId"}\" with id: \"{redisData.id}\""); + } + + [FunctionName(nameof(AllChannelsWriteBehind))] + public static async Task AllChannelsWriteBehind( + [RedisPubSubTrigger(localhostSetting, allChannels)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting)]IAsyncCollector cosmosOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + PubSubData redisData = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB + await cosmosOut.AddAsync(redisData); + logger.LogInformation($"Message: \"{redisData.message}\" from Channel: \"{redisData.channel}\" stored in Cosmos DB container: \"{"PSContainerId"}\" with id: \"{redisData.id}\""); + } + + [FunctionName(nameof(SingleChannelWriteThrough))] + public static void SingleChannelWriteThrough( + [RedisPubSubTrigger(localhostSetting, pubsubChannel)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting)]out dynamic cosmosDBOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + cosmosDBOut = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB + logger.LogInformation($"Message: \"{cosmosDBOut.message}\" from Channel: \"{cosmosDBOut.channel}\" stored in Cosmos DB container: \"{"PSContainerId"}\" with id: \"{cosmosDBOut.id}\""); + } + + [FunctionName(nameof(MultipleChannelWriteThrough))] + public static void MultipleChannelWriteThrough( + [RedisPubSubTrigger(localhostSetting, pubsubMultiple)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting)]out dynamic cosmosDBOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + cosmosDBOut = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB + logger.LogInformation($"Message: \"{cosmosDBOut.message}\" from Channel: \"{cosmosDBOut.channel}\" stored in Cosmos DB container: \"{"PSContainerId"}\" with id: \"{cosmosDBOut.id}\""); + } + + [FunctionName(nameof(AllChannelsWriteThrough))] + public static void AllChannelsWriteThrough( + [RedisPubSubTrigger(localhostSetting, allChannels)] ChannelMessage pubSubMessage, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting)]out dynamic cosmosDBOut, + ILogger logger) + { + //create a PubSubData object from the pubsub message + cosmosDBOut = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: pubSubMessage.Channel, + message: pubSubMessage.Message, + timestamp: DateTime.UtcNow + ); + + //write the PubSubData object to Cosmos DB + logger.LogInformation($"Message: \"{cosmosDBOut.message}\" from Channel: \"{cosmosDBOut.channel}\" stored in Cosmos DB container: \"{"PSContainerId"}\" with id: \"{cosmosDBOut.id}\""); + } + + + //write-through -- Tests + [FunctionName(nameof(WriteThrough))] + public static void WriteThrough( + [RedisPubSubTrigger(localhostSetting, "__keyevent@0__:set")] string newKey, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "ContainerId", + Connection = cosmosDbConnectionSetting)]out dynamic redisData, + ILogger logger) + { + //assign the data from Redis to a dynamic object that will be written to Cosmos DB + redisData = new RedisData( + id: Guid.NewGuid().ToString(), + key: newKey, + value: s_redisDb.StringGet(newKey), + timestamp: DateTime.UtcNow + ); + + logger.LogInformation($"Key: \"{newKey}\", Value: \"{redisData.value}\" addedd to Cosmos DB container: \"{"ContainerId"}\" at id: \"{redisData.id}\""); + } + + //Write-Behind -- Tests + [FunctionName(nameof(WriteBehindAsync))] + public static async Task WriteBehindAsync( + [RedisPubSubTrigger(localhostSetting, "__keyevent@0__:set")] string newKey, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "ContainerId", + Connection = cosmosDbConnectionSetting)]IAsyncCollector cosmosOut, + ILogger logger) + { + //load data from Redis into a record + RedisData redisData = new RedisData( + id: Guid.NewGuid().ToString(), + key: newKey, + value: await s_redisDb.StringGetAsync(newKey), + timestamp: DateTime.UtcNow + ); + + //write the record to Cosmos DB + await cosmosOut.AddAsync(redisData); + logger.LogInformation($"Key: \"{newKey}\", Value: \"{redisData.value}\" added to Cosmos DB container: \"{"ContainerId"}\" at id: \"{redisData.id}\""); + } + + //Write-Around -- Tests + [FunctionName(nameof(WriteAroundAsync))] + public static async Task WriteAroundAsync([CosmosDBTrigger( + databaseName: "DatabaseId", + containerName: "ContainerId", + Connection = cosmosDbConnectionSetting, + LeaseContainerName = "leases", LeaseContainerPrefix = "Write-Around-")]IReadOnlyList input, + ILogger logger) + { + //if the list is empty, return + if (input == null || input.Count <= 0) return; + + //for each item upladed to cosmos, write it to Redis + foreach (var document in input) + { + //if the key/value pair is already in Redis, throw an exception + if (await s_redisDb.StringGetAsync(document.key) == document.value) + { + throw new Exception($"ERROR: Key: \"{document.key}\", Value: \"{document.value}\" pair is already in Azure Redis Cache."); + } + //Write the key/value pair to Redis + await s_redisDb.StringSetAsync(document.key, document.value); + logger.LogInformation($"Key: \"{document.key}\", Value: \"{document.value}\" added to Redis."); + } + } + + //Write-Around Message caching -- Tests + [FunctionName(nameof(WriteAroundMessageAsync))] + public static async Task WriteAroundMessageAsync( + [CosmosDBTrigger( + databaseName: "DatabaseId", + containerName: "PSContainerId", + Connection = cosmosDbConnectionSetting, + LeaseContainerName = "leases", LeaseContainerPrefix = "Write-Around-")]IReadOnlyList cosmosData, + ILogger logger) + { + //if the list is empty, return + if (cosmosData == null || cosmosData.Count <= 0) return; + + //for each new item upladed to cosmos, publish to Redis + foreach (var document in cosmosData) + { + //publish the message to the correct Redis channel + await s_redisDb.Multiplexer.GetSubscriber().PublishAsync(document.channel, document.message); + logger.LogInformation($"Message: \"{document.message}\" has been published on channel: \"{document.channel}\"."); + } + } + + //Read-Through -- Tests + [FunctionName(nameof(ReadThroughAsync))] + public static async Task ReadThroughAsync( + [RedisPubSubTrigger(localhostSetting, "__keyevent@0__:keymiss")] string missedkey, + [CosmosDB( + databaseName: "DatabaseId", + containerName: "ContainerId", + Connection = cosmosDbConnectionSetting)]CosmosClient client, + ILogger logger) + { + //get the Cosmos DB database and the container to read from + Container cosmosContainer = client.GetContainer("DatabaseId", "ContainerId"); + var queryable = cosmosContainer.GetItemLinqQueryable(); + + //get all entries in the container that contain the missed key + using FeedIterator feed = queryable + .Where(p => p.key == missedkey) + .OrderByDescending(p => p.timestamp) + .ToFeedIterator(); + var response = await feed.ReadNextAsync(); + + //if the key is found in Cosmos DB, add the most recently updated to Redis + var item = response.FirstOrDefault(defaultValue: null); + if (item != null) + { + await s_readRedisDb.Value.StringSetAsync(item.key, item.value); + logger.LogInformation($"Key: \"{item.key}\", Value: \"{item.value}\" added to Redis."); + } + else + { + //if the key isnt found in Cosmos DB, throw an exception + throw new Exception($"ERROR: Key: \"{missedkey}\" not found in Redis or Cosmos DB. Try adding the Key-Value pair to Redis or Cosmos DB."); + } + } + } +} diff --git a/test/dotnet/Integration/PubSubCosmosIntegrationTests.cs b/test/dotnet/Integration/PubSubCosmosIntegrationTests.cs new file mode 100644 index 0000000..182060a --- /dev/null +++ b/test/dotnet/Integration/PubSubCosmosIntegrationTests.cs @@ -0,0 +1,392 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using FakeItEasy.Sdk; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.Cosmos.Linq; +using StackExchange.Redis; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration +{ + [Collection("PubSubTriggerTests")] + public class PubSubCosmosIntegrationTests + { + + [Theory] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.SingleChannelWriteBehind), PubSubCosmosIntegrationTestFunctions.pubsubChannel, "testValue single")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.SingleChannelWriteBehind), PubSubCosmosIntegrationTestFunctions.pubsubChannel, "testValue multi")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.MultipleChannelWriteBehind), PubSubCosmosIntegrationTestFunctions.pubsubChannel + "suffix", "testSuffix multi")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.AllChannelsWriteBehind), PubSubCosmosIntegrationTestFunctions.pubsubChannel + "suffix", "testSuffix all")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.AllChannelsWriteBehind), "prefix" + PubSubCosmosIntegrationTestFunctions.pubsubChannel, "testPrefix all")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.AllChannelsWriteBehind), "separate", "testSeparate all")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.SingleChannelWriteThrough), PubSubCosmosIntegrationTestFunctions.pubsubChannel, "testValue single")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.SingleChannelWriteThrough), PubSubCosmosIntegrationTestFunctions.pubsubChannel, "testValue multi")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.MultipleChannelWriteThrough), PubSubCosmosIntegrationTestFunctions.pubsubChannel + "suffix", "testSuffix multi")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.AllChannelsWriteThrough), PubSubCosmosIntegrationTestFunctions.pubsubChannel + "suffix", "testSuffix all")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.AllChannelsWriteThrough), "prefix" + PubSubCosmosIntegrationTestFunctions.pubsubChannel, "testPrefix all")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.AllChannelsWriteThrough), "separate", "testSeparate all")] + public async void PubSubMessageWrite_SuccessfullyWritesToCosmos(string functionName, string channel, string message) + { + //start the function trigger and publish a message to the specified pubsub channel + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7079)) + { + ISubscriber subscriber = multiplexer.GetSubscriber(); + + subscriber.Publish(channel, message); + await Task.Delay(TimeSpan.FromSeconds(1)); + + await multiplexer.CloseAsync(); + functionsProcess.Kill(); + }; + + //Query Cosmos DB for the message that was published + string cosmosMessage = null; + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + { + var db = client.GetContainer("DatabaseId", "PSContainerId"); + var queryable = db.GetItemLinqQueryable(); + + //get all entries in the container that contain the correct channel + using FeedIterator feed = queryable + .Where(p => p.channel == channel) + .OrderByDescending(p => p.timestamp) + .ToFeedIterator(); + var response = await feed.ReadNextAsync(); + var item = response.FirstOrDefault(defaultValue: null); + cosmosMessage = item?.message; + } + //check that the message was stored in Cosmos DB as expected, then clear the container + Assert.True(message == cosmosMessage, $"Expected \"{message}\" but got \"{cosmosMessage}\""); + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "PSContainerId"); + } + + + [Theory] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey-1", "testValue1")] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey-2", "testValue2")] + public async void RedisToCosmos_SuccessfullyWritesToCosmos(string functionName, string key, string value) + { + string keyFromCosmos = null; + string valueFromCosmos = null; + //start the function trigger and set a new key/value pair in Redis + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7079)) + { + var redisDb = multiplexer.GetDatabase(); + await redisDb.StringSetAsync(key, value); + await Task.Delay(TimeSpan.FromSeconds(5)); + + + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + { + var cosmosDb = client.GetContainer("DatabaseId", "ContainerId"); + var queryable = cosmosDb.GetItemLinqQueryable(); + + //get all entries in the container that contain the correct key + using FeedIterator feed = queryable + .Where(p => p.key == key) + .OrderByDescending(p => p.timestamp) + .ToFeedIterator(); + var response = await feed.ReadNextAsync(); + await Task.Delay(TimeSpan.FromSeconds(3)); + + var item = response.FirstOrDefault(defaultValue: null); + + keyFromCosmos = item?.key; + valueFromCosmos = item?.value; + }; + //delete the key from Redis and close the function trigger + await redisDb.KeyDeleteAsync(key); + functionsProcess.Kill(); + }; + //Check that the key and value stored in Cosmos DB match the key and value that were set in Redis + Assert.True(keyFromCosmos == key, $"Expected \"{key}\" but got \"{keyFromCosmos}\""); + Assert.True(valueFromCosmos == value, $"Expected \"{value}\" but got \"{valueFromCosmos}\""); + //clear the Cosmos DB container + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "ContainerId"); + } + + [Fact] + public async void WriteAround_SuccessfullyWritesToRedis() + { + //start the function trigger and add a new key value pair entry to Cosmos DB + string functionName = nameof(PubSubCosmosIntegrationTestFunctions.WriteAroundAsync); + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7081)) + { + Container cosmosContainer = client.GetContainer("DatabaseId", "ContainerId"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + RedisData redisData = new RedisData( + id: Guid.NewGuid().ToString(), + key: "cosmosKey", + value: "cosmosValue", + timestamp: DateTime.UtcNow + ); + + await cosmosContainer.CreateItemAsync(redisData); + await Task.Delay(TimeSpan.FromSeconds(10)); + client.Dispose(); + functionsProcess.Kill(); + + } + + //check that the key value pair was added to Redis + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + { + var redisValue = await multiplexer.GetDatabase().StringGetAsync("cosmosKey"); + await Task.Delay(TimeSpan.FromSeconds(10)); + Assert.Equal("cosmosValue", redisValue); + //await multiplexer.GetDatabase().KeyDeleteAsync("cosmosKey"); + // await Task.Delay(TimeSpan.FromSeconds(3)); + await multiplexer.CloseAsync(); + } + //clear the Cosmos DB container + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "ContainerId"); + } + + + [Fact] + public async void WriteAroundMessage_SuccessfullyPublishesToRedis() + { + //start the function trigger and connect to Redis and Cosmos DB + string functionName = nameof(PubSubCosmosIntegrationTestFunctions.WriteAroundMessageAsync); + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7081)) + { + //create a subscriber for the channel that will be published to. + //once a message is published, this lambda function will check that the message from Cosmos DB is correct in Redis + ISubscriber subscriber = multiplexer.GetSubscriber(); + subscriber.Subscribe("PubSubChannel", (channel, message) => + { + Assert.Equal("PubSubMessage", message); + }); + + //add a document to Cosmos DB containing the pubsub channel and message + Container cosmosContainer = client.GetContainer("DatabaseId", "PSContainerId"); + PubSubData redisData = new PubSubData( + id: Guid.NewGuid().ToString(), + channel: "PubSubChannel", + message: "PubSubMessage", + timestamp: DateTime.UtcNow + ); + + await cosmosContainer.CreateItemAsync(redisData); + await Task.Delay(TimeSpan.FromSeconds(5)); + client.Dispose(); + functionsProcess.Kill(); + await multiplexer.CloseAsync(); + } + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "PSContainerId"); + } + + + [Fact] + public async void ReadThrough_SuccessfullyWritesToRedis() + { + //Add a key value pair to Cosmos DB that will be read by the function trigger and written to Redis + string functionName = nameof(PubSubCosmosIntegrationTestFunctions.ReadThroughAsync); + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + { + Container cosmosContainer = client.GetContainer("DatabaseId", "ContainerId"); + RedisData redisData = new RedisData( + id: Guid.NewGuid().ToString(), + key: "cosmosKey1", + value: "cosmosValue1", + timestamp: DateTime.UtcNow + ); + await cosmosContainer.UpsertItemAsync(redisData); + await Task.Delay(TimeSpan.FromSeconds(2)); + client.Dispose(); + } + + //start the function trigger and connect to Redis + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7082)) + { + //Attempt to get the key from Redis. If the key is found, the test fails. If the key is not found, our function trigger executes and writes the key to Redis + var redisValue = await multiplexer.GetDatabase().StringGetAsync("cosmosKey1"); + Assert.True(redisValue.IsNull, userMessage: "Key already in Redis Cache, test failed"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + //check that the function trigger worked as expected and the key value pair was added to Redis + redisValue = await multiplexer.GetDatabase().StringGetAsync("cosmosKey1"); + await Task.Delay(TimeSpan.FromSeconds(3)); + Assert.Equal("cosmosValue1", redisValue); + + //clean up the cache and stop the function trigger + await multiplexer.GetDatabase().KeyDeleteAsync("cosmosKey1"); + await Task.Delay(TimeSpan.FromSeconds(2)); + await multiplexer.CloseAsync(); + functionsProcess.Kill(); + } + //clear the Cosmos DB container + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "ContainerId"); + } + + [Fact] + public async void ReadThrough_UnsuccessfulWhenKeyNotFoundInCosmos() + { + string functionName = nameof(PubSubCosmosIntegrationTestFunctions.ReadThroughAsync); + + //Dictionary to store the expected output from the function trigger + //if the output matches, the value is decmented + Dictionary counts = new Dictionary + { + { $"Executed '{functionName}' (Failed", 1}, + { $"ERROR: Key: \"unknownKey\" not found in Redis or Cosmos DB. Try adding the Key-Value pair to Redis or Cosmos DB.", 1}, + }; + + //start the function trigger and connect to Redis + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7080)) + { + //subscribe to the function trigger's output stream and check that the output matches the expected output + functionsProcess.OutputDataReceived += IntegrationTestHelpers.CounterHandlerCreator(counts); + + //Attempt to get the key from Redis. If the key is found, the test fails. If the key is not found the line above decrements the values in the dictionary + var redisValue = await multiplexer.GetDatabase().StringGetAsync("unknownKey"); + Assert.True(redisValue.IsNull, userMessage: "Key already in Redis Cache, test failed"); + + await Task.Delay(TimeSpan.FromSeconds(1)); + //check that the function trigger worked as expected and the key value pair was not added to Redis + Assert.True(redisValue.IsNull); + + //check that the expected output was written to the function trigger's output stream + var incorrect = counts.Where(pair => pair.Value != 0); + Assert.False(incorrect.Any(), JsonSerializer.Serialize(incorrect)); + + await multiplexer.CloseAsync(); + } + } + + [Theory] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey1", "testValue1", 10)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey1", "testValue1", 100)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey1", "testValue1", 1000)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey2", "testValue2", 10)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey2", "testValue2", 100)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey2", "testValue2", 1000)] + public async void RedisToCosmos_MultipleWritesSuccessfully(string functionName, string key, string value, int numberOfWrites) + { + string keyFromCosmos = null; + string valueFromCosmos = null; + //start the function trigger and connect to Redis + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7072)) + { + var redisDb = multiplexer.GetDatabase(); + + //iterate through the number of writes specified in the test case and write the key value pair to Redis. Then check that the key value pair was written to Cosmos before moving on the the next itteration. + for (int i = 1; i <= numberOfWrites; i++) + { + await redisDb.StringSetAsync(key + "-" + i, value + "-" + i); + + //query Cosmos DB for the key value pair written to Redis + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + { + var cosmosDb = client.GetContainer("DatabaseId", "ContainerId"); + var queryable = cosmosDb.GetItemLinqQueryable(); + + //get all entries in the container that contain the missed key + using FeedIterator feed = queryable + .Where(p => p.key == key + "-"+ i) + .OrderByDescending(p => p.timestamp) + .ToFeedIterator(); + var response = await feed.ReadNextAsync(); + //await Task.Delay(TimeSpan.FromSeconds(3)); + + var item = response.FirstOrDefault(defaultValue: null); + + keyFromCosmos = item?.key; + valueFromCosmos = item?.value; + }; + //check that the key value pair was written to Cosmos DB + Assert.True(keyFromCosmos == key + "-" + i, $"Expected \"{key + "-" + i}\" but got \"{keyFromCosmos}\""); + Assert.True(valueFromCosmos == value + "-" + i, $"Expected \"{value + "-" + i}\" but got \"{valueFromCosmos}\""); + } + + //Delete all key value pairs from Redis and Cosmos DB + for (int i = 1; i <= numberOfWrites; i++) + { + await redisDb.KeyDeleteAsync(key + "-" + i); + } + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "ContainerId"); + functionsProcess.Kill(); + }; + } + [Theory] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey1", "testValue1", 10)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey1", "testValue1", 100)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteThrough), "testKey1", "testValue1", 1000)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey2", "testValue2", 10)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey2", "testValue2", 100)] + [InlineData(nameof(PubSubCosmosIntegrationTestFunctions.WriteBehindAsync), "testKey2", "testValue2", 1000)] + public async void RedisToCosmos_MultipleWritesSuccessfullyV2(string functionName, string key, string value, int numberOfWrites) + { + string keyFromCosmos = null; + string valueFromCosmos = null; + //start the function trigger and connect to Redis + using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.localhostSetting))) + using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7079)) + { + var redisDb = multiplexer.GetDatabase(); + + //iterate through the number of writes specified in the test case and write all the key value pairs to Redis. + for (int i = 1; i <= numberOfWrites; i++) + { + await redisDb.StringSetAsync(key + "-" + i, value + "-" + i); + + //await Task.Delay(TimeSpan.FromSeconds(1)); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + //itterate through each key value pair written to Redis and Query Cosmos DB for that entry + using (CosmosClient client = new CosmosClient(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, PubSubCosmosIntegrationTestFunctions.cosmosDbConnectionSetting))) + { + var cosmosDb = client.GetContainer("DatabaseId", "ContainerId"); + var queryable = cosmosDb.GetItemLinqQueryable(); + for (int i = 1; i <= numberOfWrites; i++) + { + //get all entries in the container that contain the missed key + using FeedIterator feed = queryable + .Where(p => p.key == key + "-" + i) + .OrderByDescending(p => p.timestamp) + .ToFeedIterator(); + var response = await feed.ReadNextAsync(); + //await Task.Delay(TimeSpan.FromSeconds(2)); + + var item = response.FirstOrDefault(defaultValue: null); + + if(item == null) + { + await Task.Delay(TimeSpan.FromSeconds(5)); + + item = response.FirstOrDefault(defaultValue: null); + } + keyFromCosmos = item?.key; + valueFromCosmos = item?.value; + + //check that the key value pair was written to Cosmos DB and that the key value pair written to Redis matches the key value pair written to Cosmos DB + Assert.True(keyFromCosmos == key + "-" + i, $"Expected \"{key + "-" + i}\" but got \"{keyFromCosmos}\""); + Assert.True(valueFromCosmos == value + "-" + i, $"Expected \"{value + "-" + i}\" but got \"{valueFromCosmos}\""); + } + }; + + //Delete all key value pairs from Redis and Cosmos DB + for (int i = 1; i <= numberOfWrites; i++) + { + await redisDb.KeyDeleteAsync(key + "-" + i); + } + IntegrationTestHelpers.ClearDataFromCosmosDb("DatabaseId", "ContainerId"); + functionsProcess.Kill(); + }; + } + } +} diff --git a/test/dotnet/local.settings.json b/test/dotnet/local.settings.json index feabf63..f57249d 100644 --- a/test/dotnet/local.settings.json +++ b/test/dotnet/local.settings.json @@ -2,9 +2,14 @@ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", - "FUNCTIONS_WORKER_RUNTIME": "dotnet" + "FUNCTIONS_WORKER_RUNTIME": "dotnet", + "CosmosDbDatabaseId": "DatabaseId", + "CosmosDbContainerId": "ContainerId", + "PubSubContainerId": "PSContainerId" }, "ConnectionStrings": { - "redisLocalhost": "127.0.0.1:6379" + "redisLocalhost": "127.0.0.1:6379", + "RedisConnectionString": ".redis.cache.windows.net:6380,password=,ssl=True,abortConnect=False,tiebreaker=", + "CosmosDbConnectionString": "AccountEndpoint=https://.documents.azure.com:443/;AccountKey=;" } } \ No newline at end of file