Skip to content

Commit 048433d

Browse files
authored
Add streams-based trigger, include new tests that ensure multiple functions instances don't duplicate events
1 parent 39bf49c commit 048433d

9 files changed

+441
-53
lines changed

Diff for: samples/RedisSamples.cs

+17
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,22 @@ public static void ListsMultipleTrigger(
4646
{
4747
logger.LogInformation(JsonSerializer.Serialize(model));
4848
}
49+
50+
[FunctionName(nameof(StreamsTrigger))]
51+
public static void StreamsTrigger(
52+
[RedisStreamsTrigger(ConnectionString = localhost, Keys = "streamTest")] RedisMessageModel model,
53+
ILogger logger)
54+
{
55+
logger.LogInformation(JsonSerializer.Serialize(model));
56+
}
57+
58+
[FunctionName(nameof(StreamsMultipleTriggers))]
59+
public static void StreamsMultipleTriggers(
60+
[RedisStreamsTrigger(ConnectionString = localhost, Keys = "streamTest1 streamTest2")] RedisMessageModel model,
61+
ILogger logger)
62+
{
63+
logger.LogInformation(JsonSerializer.Serialize(model));
64+
}
65+
4966
}
5067
}

Diff for: src/RedisExtensionConfigProvider.cs

+5-2
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ public void Initialize(ExtensionConfigContext context)
3232
}
3333

3434
#pragma warning disable CS0618
35-
FluentBindingRule<RedisPubSubTriggerAttribute> rule = context.AddBindingRule<RedisPubSubTriggerAttribute>();
36-
rule.BindToTrigger<RedisMessageModel>(new RedisPubSubTriggerBindingProvider(configuration));
35+
FluentBindingRule<RedisPubSubTriggerAttribute> pubsubTriggerRule = context.AddBindingRule<RedisPubSubTriggerAttribute>();
36+
pubsubTriggerRule.BindToTrigger<RedisMessageModel>(new RedisPubSubTriggerBindingProvider(configuration));
3737

3838
FluentBindingRule<RedisListsTriggerAttribute> listsTriggerRule = context.AddBindingRule<RedisListsTriggerAttribute>();
3939
listsTriggerRule.BindToTrigger<RedisMessageModel>(new RedisListsTriggerBindingProvider(configuration));
40+
41+
FluentBindingRule<RedisStreamsTriggerAttribute> streamsTriggerRule = context.AddBindingRule<RedisStreamsTriggerAttribute>();
42+
streamsTriggerRule.BindToTrigger<RedisMessageModel>(new RedisStreamsTriggerBindingProvider(configuration));
4043
#pragma warning restore CS0618
4144
}
4245
}

Diff for: src/StreamsTrigger/RedisStreamsListener.cs

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
using System;
2+
using System.Linq;
3+
using System.Text.Json;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Azure.WebJobs.Host.Executors;
7+
using StackExchange.Redis;
8+
9+
10+
namespace Microsoft.Azure.WebJobs.Extensions.Redis
11+
{
12+
/// <summary>
13+
/// Responsible for managing connections and listening to a given Azure Redis Cache.
14+
/// </summary>
15+
internal sealed class RedisStreamsListener : RedisPollingListenerBase
16+
{
17+
internal bool deleteAfterProcess;
18+
internal string consumerGroup;
19+
internal StreamPosition[] positions;
20+
internal string consumerName;
21+
22+
public RedisStreamsListener(string connectionString, string keys, TimeSpan pollingInterval, int messagesPerWorker, int batchSize, string consumerGroup, bool deleteAfterProcess, ITriggeredFunctionExecutor executor)
23+
: base(connectionString, keys, pollingInterval, messagesPerWorker, batchSize, executor)
24+
{
25+
this.consumerGroup = consumerGroup;
26+
this.deleteAfterProcess = deleteAfterProcess;
27+
this.positions = this.keys.Select((key) => new StreamPosition(key, StreamPosition.NewMessages)).ToArray();
28+
this.consumerName = Guid.NewGuid().ToString();
29+
}
30+
31+
public override async void BeforePolling()
32+
{
33+
IDatabase db = multiplexer.GetDatabase();
34+
35+
// create consumer group for each stream key
36+
foreach (RedisKey key in keys)
37+
{
38+
try
39+
{
40+
if (!await db.StreamCreateConsumerGroupAsync(key, consumerGroup))
41+
{
42+
throw new Exception($"Could not create consumer group for stream key {key}");
43+
}
44+
}
45+
catch (RedisServerException e)
46+
{
47+
// consumer group already exists
48+
if (!e.Message.Contains("BUSYGROUP"))
49+
{
50+
throw;
51+
}
52+
}
53+
}
54+
}
55+
56+
public override async Task PollAsync(CancellationToken cancellationToken)
57+
{
58+
IDatabase db = multiplexer.GetDatabase();
59+
RedisStream[] streams = await db.StreamReadGroupAsync(positions, consumerGroup, consumerName, batchSize);
60+
61+
for (int i = 0; i < streams.Length; i++)
62+
{
63+
if (streams[i].Entries.Length > 0)
64+
{
65+
foreach (StreamEntry entry in streams[i].Entries)
66+
{
67+
var triggerValue = new RedisMessageModel
68+
{
69+
Trigger = streams[i].Key,
70+
Message = JsonSerializer.Serialize(entry.Values.ToDictionary(value => value.Name.ToString(), value => value.Value.ToString()))
71+
};
72+
73+
await executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = triggerValue }, cancellationToken);
74+
};
75+
76+
RedisValue[] entryIds = streams[i].Entries.Select(entry => entry.Id).ToArray();
77+
await db.StreamAcknowledgeAsync(streams[i].Key, consumerGroup, entryIds);
78+
79+
if (deleteAfterProcess)
80+
{
81+
await db.StreamDeleteAsync(streams[i].Key, entryIds);
82+
}
83+
}
84+
};
85+
}
86+
87+
public override void BeforeClosing()
88+
{
89+
IDatabase db = multiplexer.GetDatabase();
90+
foreach (RedisKey key in keys)
91+
{
92+
db.StreamDeleteConsumerAsync(key, consumerGroup, consumerName);
93+
}
94+
}
95+
96+
public override Task<RedisPollingMetrics> GetMetricsAsync()
97+
{
98+
var metrics = new RedisPollingMetrics
99+
{
100+
Remaining = keys.Sum((key) => multiplexer.GetDatabase().StreamLength(key)),
101+
Timestamp = DateTime.UtcNow,
102+
};
103+
104+
return Task.FromResult(metrics);
105+
}
106+
}
107+
}

Diff for: src/StreamsTrigger/RedisStreamsTriggerAttribute.cs

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
using Microsoft.Azure.WebJobs.Description;
3+
4+
namespace Microsoft.Azure.WebJobs.Extensions.Redis
5+
{
6+
/// <summary>
7+
/// Streams trigger binding attributes.
8+
/// </summary>
9+
[Binding]
10+
[AttributeUsage(AttributeTargets.Parameter)]
11+
public class RedisStreamsTriggerAttribute : RedisPollingTriggerAttributeBase
12+
{
13+
/// <summary>
14+
/// Name of the consumer group to use when reading the streams.
15+
/// </summary>
16+
public string ConsumerGroup { get; set; } = "AzureFunctionRedisExtension";
17+
18+
/// <summary>
19+
/// If true, the function will delete the stream entries after processing.
20+
/// </summary>
21+
public bool DeleteAfterProcess { get; set; } = false;
22+
23+
}
24+
}

Diff for: src/StreamsTrigger/RedisStreamsTriggerBinding.cs

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using System.Collections.Generic;
4+
using Microsoft.Azure.WebJobs.Host.Bindings;
5+
using Microsoft.Azure.WebJobs.Host.Listeners;
6+
using Microsoft.Azure.WebJobs.Host.Protocols;
7+
using Microsoft.Azure.WebJobs.Host.Triggers;
8+
9+
namespace Microsoft.Azure.WebJobs.Extensions.Redis
10+
{
11+
/// <summary>
12+
/// Trigger Binding, manages and binds context to listener.
13+
/// </summary>
14+
internal class RedisStreamsTriggerBinding : ITriggerBinding
15+
{
16+
private readonly string connectionString;
17+
private readonly TimeSpan pollingInterval;
18+
private readonly int messagesPerWorker;
19+
private readonly string keys;
20+
private readonly int count;
21+
private readonly string consumerGroup;
22+
private readonly bool deleteAfterProcess;
23+
24+
public RedisStreamsTriggerBinding(string connectionString, string keys, TimeSpan pollingInterval, int messagesPerWorker, int count, string consumerGroup, bool deleteAfterProcess)
25+
{
26+
this.connectionString = connectionString;
27+
this.keys = keys;
28+
this.pollingInterval = pollingInterval;
29+
this.messagesPerWorker = messagesPerWorker;
30+
this.count = count;
31+
this.consumerGroup = consumerGroup;
32+
this.deleteAfterProcess = deleteAfterProcess;
33+
}
34+
35+
public Type TriggerValueType => typeof(RedisMessageModel);
36+
37+
public IReadOnlyDictionary<string, Type> BindingDataContract => new Dictionary<string, Type>();
38+
39+
public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
40+
{
41+
IReadOnlyDictionary<string, object> bindingData = new Dictionary<string, object>();
42+
return Task.FromResult<ITriggerData>(new TriggerData(null, bindingData));
43+
}
44+
45+
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
46+
{
47+
if (context == null)
48+
{
49+
throw new ArgumentNullException("context");
50+
}
51+
52+
return Task.FromResult<IListener>(new RedisStreamsListener(connectionString, keys, pollingInterval, messagesPerWorker, count, consumerGroup, deleteAfterProcess, context.Executor));
53+
}
54+
55+
public ParameterDescriptor ToParameterDescriptor()
56+
{
57+
return new ParameterDescriptor();
58+
}
59+
}
60+
}
+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System;
2+
using System.Reflection;
3+
using System.Threading.Tasks;
4+
using Microsoft.Extensions.Configuration;
5+
using Microsoft.Azure.WebJobs.Host.Triggers;
6+
7+
namespace Microsoft.Azure.WebJobs.Extensions.Redis
8+
{
9+
/// <summary>
10+
/// Provides trigger binding, variables configured in local.settings.json are being retrieved here.
11+
/// </summary>
12+
internal class RedisStreamsTriggerBindingProvider : ITriggerBindingProvider
13+
{
14+
private readonly IConfiguration configuration;
15+
16+
public RedisStreamsTriggerBindingProvider(IConfiguration configuration)
17+
{
18+
this.configuration = configuration;
19+
}
20+
21+
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
22+
{
23+
if (context == null)
24+
{
25+
throw new ArgumentNullException("context");
26+
}
27+
28+
ParameterInfo parameter = context.Parameter;
29+
RedisStreamsTriggerAttribute attribute = parameter.GetCustomAttribute<RedisStreamsTriggerAttribute>(inherit: false);
30+
31+
if (attribute == null)
32+
{
33+
return Task.FromResult<ITriggerBinding>(null);
34+
}
35+
36+
string connectionString = RedisUtilities.ResolveString(configuration, attribute.ConnectionString, "ConnectionString");
37+
string keys = RedisUtilities.ResolveString(configuration, attribute.Keys, "Keys");
38+
int messagesPerWorker = attribute.MessagesPerWorker;
39+
int batchSize = attribute.BatchSize;
40+
TimeSpan pollingInterval = TimeSpan.FromMilliseconds(attribute.PollingIntervalInMs);
41+
string consumerGroup = RedisUtilities.ResolveString(configuration, attribute.ConsumerGroup, "ConsumerGroup");
42+
bool deleteAfterProcess = attribute.DeleteAfterProcess;
43+
44+
return Task.FromResult<ITriggerBinding>(new RedisStreamsTriggerBinding(connectionString, keys, pollingInterval, messagesPerWorker, batchSize, consumerGroup, deleteAfterProcess));
45+
}
46+
}
47+
}

Diff for: test/Integration/IntegrationTestFunctions.cs

+27-6
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@ public static class IntegrationTestFunctions
1010
public const string pubsubChannel = "testChannel";
1111
public const string keyspaceChannel = "__keyspace@0__:testKey";
1212
public const string keyeventChannel = "__keyevent@0__:set";
13+
public const string keyeventChannelAll = "__keyevent@0__:*";
14+
public const string keyspaceChannelAll = "__keyspace@0__:*";
1315
public const string all = "*";
1416
public const string listSingleKey = "listSingleKey";
1517
public const string listMultipleKeys = "listKey1 listKey2 listKey3";
18+
public const string streamSingleKey = "streamSingleKey";
19+
public const string streamMultipleKeys = "streamKey1 streamKey2 streamKey3";
20+
public const int pollingInterval = 100;
1621
public const int count = 100;
1722

1823
[FunctionName(nameof(PubSubTrigger_SingleChannel))]
@@ -25,7 +30,7 @@ public static void PubSubTrigger_SingleChannel(
2530

2631
[FunctionName(nameof(PubSubTrigger_MultipleChannels))]
2732
public static void PubSubTrigger_MultipleChannels(
28-
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = pubsubChannel + all)] RedisMessageModel model,
33+
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = pubsubChannel + "*")] RedisMessageModel model,
2934
ILogger logger)
3035
{
3136
logger.LogInformation(JsonSerializer.Serialize(model));
@@ -49,15 +54,15 @@ public static void KeySpaceTrigger_SingleKey(
4954

5055
[FunctionName(nameof(KeySpaceTrigger_MultipleKeys))]
5156
public static void KeySpaceTrigger_MultipleKeys(
52-
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = keyspaceChannel + all)] RedisMessageModel model,
57+
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = keyspaceChannel + "*")] RedisMessageModel model,
5358
ILogger logger)
5459
{
5560
logger.LogInformation(JsonSerializer.Serialize(model));
5661
}
5762

5863
[FunctionName(nameof(KeySpaceTrigger_AllKeys))]
5964
public static void KeySpaceTrigger_AllKeys(
60-
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = all)] RedisMessageModel model,
65+
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = keyspaceChannelAll)] RedisMessageModel model,
6166
ILogger logger)
6267
{
6368
logger.LogInformation(JsonSerializer.Serialize(model));
@@ -73,23 +78,39 @@ public static void KeyEventTrigger_SingleEvent(
7378

7479
[FunctionName(nameof(KeyEventTrigger_AllEvents))]
7580
public static void KeyEventTrigger_AllEvents(
76-
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = all)] RedisMessageModel model,
81+
[RedisPubSubTrigger(ConnectionString = connectionString, Channel = keyeventChannelAll)] RedisMessageModel model,
7782
ILogger logger)
7883
{
7984
logger.LogInformation(JsonSerializer.Serialize(model));
8085
}
8186

8287
[FunctionName(nameof(ListsTrigger_SingleKey))]
8388
public static void ListsTrigger_SingleKey(
84-
[RedisListsTrigger(ConnectionString = connectionString, Keys = listSingleKey)] RedisMessageModel result,
89+
[RedisListsTrigger(ConnectionString = connectionString, Keys = listSingleKey, PollingIntervalInMs = pollingInterval)] RedisMessageModel result,
8590
ILogger logger)
8691
{
8792
logger.LogInformation(JsonSerializer.Serialize(result));
8893
}
8994

9095
[FunctionName(nameof(ListsTrigger_MultipleKeys))]
9196
public static void ListsTrigger_MultipleKeys(
92-
[RedisListsTrigger(ConnectionString = connectionString, Keys = listMultipleKeys)] RedisMessageModel result,
97+
[RedisListsTrigger(ConnectionString = connectionString, Keys = listMultipleKeys, PollingIntervalInMs = pollingInterval)] RedisMessageModel result,
98+
ILogger logger)
99+
{
100+
logger.LogInformation(JsonSerializer.Serialize(result));
101+
}
102+
103+
[FunctionName(nameof(StreamsTrigger_DefaultGroup_SingleKey))]
104+
public static void StreamsTrigger_DefaultGroup_SingleKey(
105+
[RedisStreamsTrigger(ConnectionString = connectionString, Keys = streamSingleKey, PollingIntervalInMs = pollingInterval)] RedisMessageModel result,
106+
ILogger logger)
107+
{
108+
logger.LogInformation(JsonSerializer.Serialize(result));
109+
}
110+
111+
[FunctionName(nameof(StreamsTrigger_DefaultGroup_MultipleKeys))]
112+
public static void StreamsTrigger_DefaultGroup_MultipleKeys(
113+
[RedisStreamsTrigger(ConnectionString = connectionString, Keys = streamMultipleKeys, PollingIntervalInMs = pollingInterval)] RedisMessageModel result,
93114
ILogger logger)
94115
{
95116
logger.LogInformation(JsonSerializer.Serialize(result));

Diff for: test/Integration/IntegrationTestHelpers.cs

+2-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Collections.Generic;
77
using System.Linq;
88
using System.Reflection;
9+
using System.Collections.Concurrent;
910

1011
namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration
1112
{
@@ -64,7 +65,7 @@ void functionLoadedHandler(object sender, DataReceivedEventArgs e)
6465
return functionsProcess;
6566
}
6667

67-
internal static DataReceivedEventHandler CounterHandlerCreator(Dictionary<string, int> counts, TaskCompletionSource<bool> functionExecuted)
68+
internal static DataReceivedEventHandler CounterHandlerCreator(IDictionary<string, int> counts)
6869
{
6970
return (object sender, DataReceivedEventArgs e) =>
7071
{
@@ -74,11 +75,6 @@ internal static DataReceivedEventHandler CounterHandlerCreator(Dictionary<string
7475
{
7576
counts[key] -= 1;
7677
}
77-
78-
if (counts.Values.Sum() == 0)
79-
{
80-
functionExecuted.TrySetResult(true);
81-
}
8278
}
8379
};
8480
}

0 commit comments

Comments
 (0)