Skip to content

Update blob-storage-databus-cleanup-function sample to NServiceBus 10 #7520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.9.34526.213
MinimumVisualStudioVersion = 15.0.26730.12
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SenderAndReceiver", "SenderAndReceiver\SenderAndReceiver.csproj", "{B656A5F2-29C7-43EE-86D9-FEC497962600}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DataBusBlobCleanupFunctions", "DataBusBlobCleanupFunctions\DataBusBlobCleanupFunctions.csproj", "{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B656A5F2-29C7-43EE-86D9-FEC497962600}.Release|Any CPU.Build.0 = Release|Any CPU
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4E54C4B8-6A71-4ADE-B7D6-E83082BD50DC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {F5306B81-00A3-4AEA-8E05-AB387F2542AC}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<LangVersion>12.0</LangVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="1.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="1.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.*" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.0" />
</ItemGroup>

<ItemGroup>
<None Update="host.json" CopyToOutputDirectory="PreserveNewest" />
<None Update="local.settings.json" CopyToOutputDirectory="PreserveNewest" CopyToPublishDirectory="Never" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DataBusBlobCleanupFunctions", "DataBusBlobCleanupFunctions.csproj", "{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C34EAECB-5CF4-42F5-9428-3C0C3CCEDFD4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {65FDA38E-030A-45D0-99D6-3606931E9935}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

public class DataBusBlobCreated(ILogger<DataBusBlobCreated> logger)
{
#region DataBusBlobCreatedFunction

[Function(nameof(DataBusBlobCreated))]
public async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")] Stream blob, string name, Uri uri, IDictionary<string, string> metadata, [DurableClient] DurableTaskClient durableTaskClient, CancellationToken cancellationToken)
{
logger.LogInformation("Blob created at {uri}", uri);

var instanceId = name;
var existingInstance = await durableTaskClient.GetInstanceAsync(instanceId, cancellationToken);

if (existingInstance != null)
{
logger.LogInformation("{DataBusCleanupOrchestratorName} has already been started for blob {uri}.", DataBusCleanupOrchestratorName, uri);
return;
}

var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(metadata);

if (validUntilUtc == DateTime.MaxValue)
{
logger.LogError("Could not parse the 'ValidUntil' value for blob {uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.", uri);
return;
}

await durableTaskClient.ScheduleNewOrchestrationInstanceAsync(DataBusCleanupOrchestratorName, new DataBusBlobData
{
Name = name,
ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
},
new StartOrchestrationOptions()
{
InstanceId = instanceId
}, cancellationToken);
}

#endregion

static readonly string DataBusCleanupOrchestratorName = nameof(DataBusCleanupOrchestrator);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
public class DataBusBlobData
{
public required string Name { get; set; }

public required string ValidUntilUtc { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@

using System.Globalization;

public static class DataBusBlobTimeoutCalculator
{
#region GetValidUntil

public static DateTime GetValidUntil(IDictionary<string, string> blobMetadata)
{
if (blobMetadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
{
return ToUtcDateTime(validUntilUtcString);
}

return DateTime.MaxValue;
}

#endregion

/// <summary>
/// Converts the <see cref="DateTime" /> to a <see cref="string" /> suitable for transport over the wire.
/// </summary>
public static string ToWireFormattedString(DateTime dateTime)
{
return dateTime.ToUniversalTime().ToString(format, CultureInfo.InvariantCulture);
}

/// <summary>
/// Converts a wire-formatted <see cref="string" /> from <see cref="ToWireFormattedString" /> to a UTC
/// <see cref="DateTime" />.
/// </summary>
public static DateTime ToUtcDateTime(string wireFormattedString)
{
if (string.IsNullOrWhiteSpace(wireFormattedString))
{
throw new ArgumentNullException(nameof(wireFormattedString));
}

if (wireFormattedString.Length != format.Length)
{
throw new FormatException(errorMessage);
}

var year = 0;
var month = 0;
var day = 0;
var hour = 0;
var minute = 0;
var second = 0;
var microSecond = 0;

for (var i = 0; i < format.Length; i++)
{
var digit = wireFormattedString[i];

switch (format[i])
{
case 'y':
Guard(digit);
year = year * 10 + (digit - '0');
break;

case 'M':
Guard(digit);
month = month * 10 + (digit - '0');
break;

case 'd':
Guard(digit);
day = day * 10 + (digit - '0');
break;

case 'H':
Guard(digit);
hour = hour * 10 + (digit - '0');
break;

case 'm':
Guard(digit);
minute = minute * 10 + (digit - '0');
break;

case 's':
Guard(digit);
second = second * 10 + (digit - '0');
break;

case 'f':
Guard(digit);
microSecond = microSecond * 10 + (digit - '0');
break;
}
}

return AddMicroseconds(new DateTime(year, month, day, hour, minute, second, DateTimeKind.Utc), microSecond);

void Guard(char digit)
{
if (digit >= '0' && digit <= '9')
{
return;
}

throw new FormatException(errorMessage);
}

DateTime AddMicroseconds(DateTime self, int microseconds)
{
return self.AddTicks(microseconds * ticksPerMicrosecond);
}
}

const string format = "yyyy-MM-dd HH:mm:ss:ffffff Z";
const string errorMessage = "String was not recognized as a valid DateTime.";
const int ticksPerMicrosecond = 10;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#nullable disable

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

public class DataBusCleanupOrchestrator(ILogger<DataBusCleanupOrchestrator> logger)
{
#region DataBusCleanupOrchestratorFunction

[Function(nameof(DataBusCleanupOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context)
{
var blobData = context.GetInput<DataBusBlobData>();

logger.LogInformation("Orchestrating deletion for blob at {name} with ValidUntilUtc of {validUntilUtc}", blobData.Name, blobData.ValidUntilUtc);

var validUntilUtc = DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc);

DateTime timeoutUntil;

//Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
do
{
timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(6) ? context.CurrentUtcDateTime.AddDays(6) : validUntilUtc;

logger.LogInformation("Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Name}. Currently {context.CurrentUtcDateTime}.", timeoutUntil, validUntilUtc, blobData.Name, context.CurrentUtcDateTime);

await context.CreateTimer(DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
} while (validUntilUtc > timeoutUntil);

await context.CallActivityAsync("DeleteBlob", blobData);
}

#endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Net;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

public class DataBusOrchestrateExistingBlobs(BlobContainerClient blobContainerClient, ILogger<DataBusOrchestrateExistingBlobs> logger)
{
#region DataBusOrchestrateExistingBlobsFunction

[Function(nameof(DataBusOrchestrateExistingBlobs))]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req, [DurableClient] DurableTaskClient durableTaskClient, CancellationToken cancellationToken)
{
var counter = 0;

try
{
var segment = blobContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, cancellationToken: cancellationToken).AsPages();

await foreach (var blobPage in segment)
{
foreach (var blobItem in blobPage.Values)
{
var instanceId = blobItem.Name;

var existingInstance = await durableTaskClient.GetInstanceAsync(instanceId, cancellationToken);

if (existingInstance != null)
{
logger.LogInformation("{name} has already been started for blob {blobItemName}.", nameof(DataBusCleanupOrchestrator), blobItem.Name);
continue;
}

var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(blobItem.Metadata);

if (validUntilUtc == DateTime.MaxValue)
{
logger.LogError("Could not parse the 'ValidUntilUtc' value for blob {name}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.", blobItem.Name);
continue;
}

await durableTaskClient.ScheduleNewOrchestrationInstanceAsync(nameof(DataBusCleanupOrchestrator), new DataBusBlobData
{
Name = blobItem.Name,
ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
},
new StartOrchestrationOptions()
{
InstanceId = instanceId
}, cancellationToken);

counter++;
}
}
}
catch (Exception exception)
{
var result = new ObjectResult(exception.Message)
{
StatusCode = (int)HttpStatusCode.InternalServerError
};

return result;
}

var message = "DataBusOrchestrateExistingBlobs has completed." + (counter > 0 ? $" {counter} blob{(counter > 1 ? "s" : string.Empty)} will be tracked for clean-up." : string.Empty);

return new OkObjectResult(message);
}

#endregion
}
Loading
Loading