Skip to content

Commit a2a8200

Browse files
authored
Merge pull request #1266 from wanxms/master
[Providers.Azure] Move away from deprecated libs and support TokenCredential
2 parents d1af4d5 + a614b7d commit a2a8200

File tree

7 files changed

+142
-71
lines changed

7 files changed

+142
-71
lines changed

src/providers/WorkflowCore.Providers.Azure/Models/ControlledLock.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
using System;
2-
using Microsoft.WindowsAzure.Storage.Blob;
1+
using Azure.Storage.Blobs.Specialized;
32

43
namespace WorkflowCore.Providers.Azure.Models
54
{
65
class ControlledLock
76
{
87
public string Id { get; set; }
98
public string LeaseId { get; set; }
10-
public CloudBlockBlob Blob { get; set; }
9+
public BlockBlobClient Blob { get; set; }
1110

12-
public ControlledLock(string id, string leaseId, CloudBlockBlob blob)
11+
public ControlledLock(string id, string leaseId, BlockBlobClient blob)
1312
{
1413
Id = id;
1514
LeaseId = leaseId;

src/providers/WorkflowCore.Providers.Azure/ServiceCollectionExtensions.cs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Microsoft.Azure.Cosmos;
1+
using System;
2+
using Azure.Core;
3+
using Microsoft.Azure.Cosmos;
24
using Microsoft.Extensions.Logging;
35
using WorkflowCore.Interface;
46
using WorkflowCore.Models;
@@ -16,6 +18,13 @@ public static WorkflowOptions UseAzureSynchronization(this WorkflowOptions optio
1618
return options;
1719
}
1820

21+
public static WorkflowOptions UseAzureSynchronization(this WorkflowOptions options, Uri blobEndpoint, Uri queueEndpoint, TokenCredential tokenCredential)
22+
{
23+
options.UseQueueProvider(sp => new AzureStorageQueueProvider(queueEndpoint, tokenCredential, sp.GetService<ILoggerFactory>()));
24+
options.UseDistributedLockManager(sp => new AzureLockManager(blobEndpoint, tokenCredential, sp.GetService<ILoggerFactory>()));
25+
return options;
26+
}
27+
1928
public static WorkflowOptions UseAzureServiceBusEventHub(
2029
this WorkflowOptions options,
2130
string connectionString,
@@ -28,6 +37,19 @@ public static WorkflowOptions UseAzureServiceBusEventHub(
2837
return options;
2938
}
3039

40+
public static WorkflowOptions UseAzureServiceBusEventHub(
41+
this WorkflowOptions options,
42+
string fullyQualifiedNamespace,
43+
TokenCredential tokenCredential,
44+
string topicName,
45+
string subscriptionName)
46+
{
47+
options.UseEventHub(sp => new ServiceBusLifeCycleEventHub(
48+
fullyQualifiedNamespace, tokenCredential, topicName, subscriptionName, sp.GetService<ILoggerFactory>()));
49+
50+
return options;
51+
}
52+
3153
public static WorkflowOptions UseCosmosDbPersistence(
3254
this WorkflowOptions options,
3355
string connectionString,
@@ -65,5 +87,24 @@ public static WorkflowOptions UseCosmosDbPersistence(
6587
options.UsePersistence(sp => new CosmosDbPersistenceProvider(sp.GetService<ICosmosClientFactory>(), databaseId, sp.GetService<ICosmosDbProvisioner>(), cosmosDbStorageOptions));
6688
return options;
6789
}
90+
91+
public static WorkflowOptions UseCosmosDbPersistence(
92+
this WorkflowOptions options,
93+
string accountEndpoint,
94+
TokenCredential tokenCredential,
95+
string databaseId,
96+
CosmosDbStorageOptions cosmosDbStorageOptions = null)
97+
{
98+
if (cosmosDbStorageOptions == null)
99+
{
100+
cosmosDbStorageOptions = new CosmosDbStorageOptions();
101+
}
102+
103+
options.Services.AddSingleton<ICosmosClientFactory>(sp => new CosmosClientFactory(accountEndpoint, tokenCredential));
104+
options.Services.AddTransient<ICosmosDbProvisioner>(sp => new CosmosDbProvisioner(sp.GetService<ICosmosClientFactory>(), cosmosDbStorageOptions));
105+
options.Services.AddSingleton<IWorkflowPurger>(sp => new WorkflowPurger(sp.GetService<ICosmosClientFactory>(), databaseId, cosmosDbStorageOptions));
106+
options.UsePersistence(sp => new CosmosDbPersistenceProvider(sp.GetService<ICosmosClientFactory>(), databaseId, sp.GetService<ICosmosDbProvisioner>(), cosmosDbStorageOptions));
107+
return options;
108+
}
68109
}
69110
}

src/providers/WorkflowCore.Providers.Azure/Services/AzureLockManager.cs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,57 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.IO;
34
using System.Linq;
45
using System.Threading;
56
using System.Threading.Tasks;
7+
using Azure.Core;
8+
using Azure.Storage.Blobs;
9+
using Azure.Storage.Blobs.Specialized;
610
using Microsoft.Extensions.Logging;
7-
using Microsoft.WindowsAzure.Storage;
8-
using Microsoft.WindowsAzure.Storage.Blob;
911
using WorkflowCore.Interface;
1012
using WorkflowCore.Providers.Azure.Models;
1113

1214
namespace WorkflowCore.Providers.Azure.Services
1315
{
1416
public class AzureLockManager: IDistributedLockProvider
1517
{
16-
private readonly CloudBlobClient _client;
18+
private readonly BlobServiceClient _client;
1719
private readonly ILogger _logger;
1820
private readonly List<ControlledLock> _locks = new List<ControlledLock>();
1921
private readonly AutoResetEvent _mutex = new AutoResetEvent(true);
20-
private CloudBlobContainer _container;
22+
private BlobContainerClient _container;
2123
private Timer _renewTimer;
2224
private TimeSpan LockTimeout => TimeSpan.FromMinutes(1);
2325
private TimeSpan RenewInterval => TimeSpan.FromSeconds(45);
2426

2527
public AzureLockManager(string connectionString, ILoggerFactory logFactory)
2628
{
2729
_logger = logFactory.CreateLogger<AzureLockManager>();
28-
var account = CloudStorageAccount.Parse(connectionString);
29-
_client = account.CreateCloudBlobClient();
30+
_client = new BlobServiceClient(connectionString);
31+
}
32+
33+
public AzureLockManager(Uri blobEndpoint, TokenCredential tokenCredential, ILoggerFactory logFactory)
34+
{
35+
_logger = logFactory.CreateLogger<AzureLockManager>();
36+
_client = new BlobServiceClient(blobEndpoint, tokenCredential);
3037
}
3138

3239
public async Task<bool> AcquireLock(string Id, CancellationToken cancellationToken)
3340
{
34-
var blob = _container.GetBlockBlobReference(Id);
41+
var blob = _container.GetBlockBlobClient(Id);
3542

3643
if (!await blob.ExistsAsync())
37-
await blob.UploadTextAsync(string.Empty);
44+
await blob.UploadAsync(new MemoryStream());
3845

3946
if (_mutex.WaitOne())
4047
{
4148
try
4249
{
43-
var leaseId = await blob.AcquireLeaseAsync(LockTimeout);
44-
_locks.Add(new ControlledLock(Id, leaseId, blob));
50+
var lease = await blob.GetBlobLeaseClient().AcquireAsync(LockTimeout);
51+
_locks.Add(new ControlledLock(Id, lease.Value.LeaseId, blob));
4552
return true;
4653
}
47-
catch (StorageException ex)
54+
catch (Exception ex)
4855
{
4956
_logger.LogDebug($"Failed to acquire lock {Id} - {ex.Message}");
5057
return false;
@@ -69,7 +76,7 @@ public async Task ReleaseLock(string Id)
6976
{
7077
try
7178
{
72-
await entry.Blob.ReleaseLeaseAsync(AccessCondition.GenerateLeaseCondition(entry.LeaseId));
79+
await entry.Blob.GetBlobLeaseClient(entry.LeaseId).ReleaseAsync();
7380
}
7481
catch (Exception ex)
7582
{
@@ -87,7 +94,7 @@ public async Task ReleaseLock(string Id)
8794

8895
public async Task Start()
8996
{
90-
_container = _client.GetContainerReference("workflowcore-locks");
97+
_container = _client.GetBlobContainerClient("workflowcore-locks");
9198
await _container.CreateIfNotExistsAsync();
9299
_renewTimer = new Timer(RenewLeases, null, RenewInterval, RenewInterval);
93100
}
@@ -128,7 +135,7 @@ private async Task RenewLock(ControlledLock entry)
128135
{
129136
try
130137
{
131-
await entry.Blob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(entry.LeaseId));
138+
await entry.Blob.GetBlobLeaseClient(entry.LeaseId).RenewAsync();
132139
}
133140
catch (Exception ex)
134141
{

src/providers/WorkflowCore.Providers.Azure/Services/AzureStorageQueueProvider.cs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
using System.Collections.Generic;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using Azure.Core;
6+
using Azure.Storage.Queues;
57
using Microsoft.Extensions.Logging;
6-
using Microsoft.WindowsAzure.Storage;
7-
using Microsoft.WindowsAzure.Storage.Queue;
88
using WorkflowCore.Interface;
99

1010
namespace WorkflowCore.Providers.Azure.Services
@@ -13,41 +13,44 @@ public class AzureStorageQueueProvider : IQueueProvider
1313
{
1414
private readonly ILogger _logger;
1515

16-
private readonly Dictionary<QueueType, CloudQueue> _queues = new Dictionary<QueueType, CloudQueue>();
16+
private readonly Dictionary<QueueType, QueueClient> _queues = new Dictionary<QueueType, QueueClient>();
1717

1818
public bool IsDequeueBlocking => false;
1919

2020
public AzureStorageQueueProvider(string connectionString, ILoggerFactory logFactory)
2121
{
2222
_logger = logFactory.CreateLogger<AzureStorageQueueProvider>();
23-
var account = CloudStorageAccount.Parse(connectionString);
24-
var client = account.CreateCloudQueueClient();
23+
var client = new QueueServiceClient(connectionString);
2524

26-
_queues[QueueType.Workflow] = client.GetQueueReference("workflowcore-workflows");
27-
_queues[QueueType.Event] = client.GetQueueReference("workflowcore-events");
28-
_queues[QueueType.Index] = client.GetQueueReference("workflowcore-index");
25+
_queues[QueueType.Workflow] = client.GetQueueClient("workflowcore-workflows");
26+
_queues[QueueType.Event] = client.GetQueueClient("workflowcore-events");
27+
_queues[QueueType.Index] = client.GetQueueClient("workflowcore-index");
28+
}
29+
30+
public AzureStorageQueueProvider(Uri queueEndpoint, TokenCredential tokenCredential, ILoggerFactory logFactory)
31+
{
32+
_logger = logFactory.CreateLogger<AzureStorageQueueProvider>();
33+
var client = new QueueServiceClient(queueEndpoint, tokenCredential);
34+
35+
_queues[QueueType.Workflow] = client.GetQueueClient("workflowcore-workflows");
36+
_queues[QueueType.Event] = client.GetQueueClient("workflowcore-events");
37+
_queues[QueueType.Index] = client.GetQueueClient("workflowcore-index");
2938
}
3039

3140
public async Task QueueWork(string id, QueueType queue)
3241
{
33-
var msg = new CloudQueueMessage(id);
34-
await _queues[queue].AddMessageAsync(msg);
42+
await _queues[queue].SendMessageAsync(id);
3543
}
3644

3745
public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
3846
{
39-
CloudQueue cloudQueue = _queues[queue];
40-
41-
if (cloudQueue == null)
42-
return null;
43-
44-
var msg = await cloudQueue.GetMessageAsync();
47+
var msg = await _queues[queue].ReceiveMessageAsync();
4548

46-
if (msg == null)
49+
if (msg == null || msg.Value == null)
4750
return null;
4851

49-
await cloudQueue.DeleteMessageAsync(msg);
50-
return msg.AsString;
52+
await _queues[queue].DeleteMessageAsync(msg.Value.MessageId, msg.Value.PopReceipt);
53+
return msg.Value.Body.ToString();
5154
}
5255

5356
public async Task Start()

src/providers/WorkflowCore.Providers.Azure/Services/CosmosClientFactory.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using Azure.Core;
23
using Microsoft.Azure.Cosmos;
34
using WorkflowCore.Providers.Azure.Interface;
45

@@ -20,6 +21,11 @@ public CosmosClientFactory(CosmosClient client)
2021
_client = client;
2122
}
2223

24+
public CosmosClientFactory(string accountEndpoint, TokenCredential tokenCredential)
25+
{
26+
_client = new CosmosClient(accountEndpoint, tokenCredential);
27+
}
28+
2329
public CosmosClient GetCosmosClient()
2430
{
2531
return this._client;

0 commit comments

Comments
 (0)