Skip to content

Commit

Permalink
Merge branch 'release/9.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jericho committed Jan 29, 2024
2 parents 433fc1c + f7f845d commit f404a6e
Show file tree
Hide file tree
Showing 19 changed files with 1,259 additions and 1,161 deletions.
143 changes: 87 additions & 56 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,77 +55,108 @@ using System.Diagnostics;

namespace WorkerRole1
{
public class MyWorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public class MyWorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

public override void Run()
{
Trace.TraceInformation("WorkerRole is running");
public override void Run()
{
Trace.TraceInformation("WorkerRole is running");

try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}

public override bool OnStart()
{
// Use TLS 1.2 for Service Bus connections
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
public override bool OnStart()
{
// Use TLS 1.2 for Service Bus connections
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;

// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;

// For information on handling configuration changes
// see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357.
// For information on handling configuration changes
// see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
bool result = base.OnStart();

Trace.TraceInformation("WorkerRole has been started");
Trace.TraceInformation("WorkerRole has been started");

return result;
}
return result;
}

public override void OnStop()
{
Trace.TraceInformation("WorkerRole is stopping");
public override void OnStop()
{
Trace.TraceInformation("WorkerRole is stopping");

this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
// Invoking "Cancel()" will cause the AsyncMessagePump to stop
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();

base.OnStop();
base.OnStop();

Trace.TraceInformation("WorkerRole has stopped");
}
Trace.TraceInformation("WorkerRole has stopped");
}

private async Task RunAsync(CancellationToken cancellationToken)
{
var connectionString = "<-- insert connection string for your Azure account -->";
var queueName = "<-- insert the name of your Azure queue -->";
private async Task RunAsync(CancellationToken cancellationToken)
{
var connectionString = "<-- insert connection string for your Azure account -->";
var concurrentTask = 10; // <-- this is the max number of messages that can be processed at a time
// Configure the message pump
var messagePump = new AsyncMessagePump(connectionString, queueName, 10, null, TimeSpan.FromMinutes(1), 3)
{
OnMessage = (message, cancellationToken) =>
{
Debug.WriteLine("Received message of type {message.Content.GetType()}");
},
OnError = (message, exception, isPoison) =>
{
Trace.TraceInformation("An error occured: {0}", exception);
}
};

// Start the message pump
await messagePump.StartAsync(cancellationToken);
}
}
var options = new MessagePumpOptions(connectionString, concurrentTasks);
var messagePump = new AsyncMessagePump(options)
{
OnMessage = (queueName, message, cancellationToken) =>
{
// This is where you insert your custom logic to process a message
},
OnError = (queueName, message, exception, isPoison) =>
{
// Insert your custom error handling
// ==========================================================================
// Important note regarding "isPoison":
// --------------------------------------------------------------------------
// this parameter indicates whether this message has exceeded the maximum
// number of retries.
//
// When you have configured the "poison queue name" for the given queue and
// this parameter is "true", the message is automatically copied to the poison
// queue and removed from the original queue.
//
// If you have not configured the "poison queue name" for the given queue and
// this parameter is "true", the message is automatically removed from the
// original queue and you are responsible for storing the message. If you don't,
// this mesage will be lost.
// ==========================================================================
}
};

// Replace the following samples with the queues you want to monitor
messagePump.AddQueue("queue01", "queue01-poison", TimeSpan.FromMinutes(1), 3, "queue01-oversize-messages");
messagePump.AddQueue("queue02", "queue02-poison", TimeSpan.FromMinutes(1), 3, "queue02-oversize-messages");
messagePump.AddQueue("queue03", "queue03-poison", TimeSpan.FromMinutes(1), 3, "queue03-oversize-messages");

// Queues can share the same poison queue
messagePump.AddQueue("queue04", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue04-oversize-messages");
messagePump.AddQueue("queue05", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue05-oversize-messages");

// Queues can also share the same blob storage for messages that exceed the max size
messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");

// Start the message pump
await messagePump.StartAsync(cancellationToken);
}
}
}
```

Expand Down
Loading

0 comments on commit f404a6e

Please sign in to comment.