Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions backend/Clients/UsenetProviderConnectionAllocator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NzbWebDAV.Config;

namespace NzbWebDAV.Clients;

public sealed class UsenetProviderConnectionAllocator
{
private readonly UsenetProviderConfig[] _providers;
private readonly int[] _liveConnections;
private readonly object _sync = new();
private int _nextIndex;

public UsenetProviderConnectionAllocator(IEnumerable<UsenetProviderConfig> providers)
{
_providers = providers?.ToArray() ?? throw new ArgumentNullException(nameof(providers));
if (_providers.Length == 0)
{
throw new ArgumentException("At least one usenet provider must be configured.", nameof(providers));
}

_liveConnections = new int[_providers.Length];
}

public int TotalConnections => _providers.Sum(provider => provider.Connections);

public ValueTask<INntpClient> CreateConnectionAsync(CancellationToken cancellationToken)
{
int providerIndex;
UsenetProviderConfig provider;

lock (_sync)
{
providerIndex = ReserveProviderIndex();
provider = _providers[providerIndex];
_liveConnections[providerIndex]++;
}

return CreateProviderConnectionAsync(providerIndex, provider, cancellationToken);
}

private int ReserveProviderIndex()
{
for (var attempt = 0; attempt < _providers.Length; attempt++)
{
var index = (_nextIndex + attempt) % _providers.Length;
if (_liveConnections[index] < _providers[index].Connections)
{
_nextIndex = index + 1;
return index;
}
}

throw new InvalidOperationException("No available usenet provider capacity.");
}

private async ValueTask<INntpClient> CreateProviderConnectionAsync(
int providerIndex,
UsenetProviderConfig provider,
CancellationToken cancellationToken)
{
try
{
var connection = await UsenetStreamingClient.CreateNewConnection(
provider.Host,
provider.Port,
provider.UseSsl,
provider.User,
provider.Pass,
cancellationToken);

return new ProviderScopedNntpClient(connection, () => Release(providerIndex));
}
catch
{
Release(providerIndex);
throw;
}
}

private void Release(int providerIndex)
{
lock (_sync)
{
if (_liveConnections[providerIndex] > 0)
{
_liveConnections[providerIndex]--;
}
}
}

private sealed class ProviderScopedNntpClient : INntpClient
{
private readonly INntpClient _inner;
private readonly Action _onDispose;
private int _disposed;

public ProviderScopedNntpClient(INntpClient inner, Action onDispose)
{
_inner = inner;
_onDispose = onDispose;
}

public Task<bool> ConnectAsync(string host, int port, bool useSsl, CancellationToken cancellationToken)
{
return _inner.ConnectAsync(host, port, useSsl, cancellationToken);
}

public Task<bool> AuthenticateAsync(string user, string pass, CancellationToken cancellationToken)
{
return _inner.AuthenticateAsync(user, pass, cancellationToken);
}

public Task<Usenet.Nntp.Responses.NntpStatResponse> StatAsync(string segmentId, CancellationToken cancellationToken)
{
return _inner.StatAsync(segmentId, cancellationToken);
}

public Task<Streams.YencHeaderStream> GetSegmentStreamAsync(string segmentId, CancellationToken cancellationToken)
{
return _inner.GetSegmentStreamAsync(segmentId, cancellationToken);
}

public Task<Usenet.Yenc.YencHeader> GetSegmentYencHeaderAsync(string segmentId, CancellationToken cancellationToken)
{
return _inner.GetSegmentYencHeaderAsync(segmentId, cancellationToken);
}

public Task<long> GetFileSizeAsync(Usenet.Nzb.NzbFile file, CancellationToken cancellationToken)
{
return _inner.GetFileSizeAsync(file, cancellationToken);
}

public Task<Usenet.Nntp.Responses.NntpDateResponse> DateAsync(CancellationToken cancellationToken)
{
return _inner.DateAsync(cancellationToken);
}

public Task WaitForReady(CancellationToken cancellationToken)
{
return _inner.WaitForReady(cancellationToken);
}

public void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 1)
{
return;
}

try
{
_inner.Dispose();
}
finally
{
_onDispose();
}
}
}
}
34 changes: 13 additions & 21 deletions backend/Clients/UsenetStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,8 @@ public UsenetStreamingClient(ConfigManager configManager, WebsocketManager webso
// initialize private members
_websocketManager = websocketManager;

// get connection settings from config-manager
var host = configManager.GetConfigValue("usenet.host") ?? string.Empty;
var port = int.Parse(configManager.GetConfigValue("usenet.port") ?? "119");
var useSsl = bool.Parse(configManager.GetConfigValue("usenet.use-ssl") ?? "false");
var user = configManager.GetConfigValue("usenet.user") ?? string.Empty;
var pass = configManager.GetConfigValue("usenet.pass") ?? string.Empty;
var connections = configManager.GetMaxConnections();

// initialize the nntp-client
var createNewConnection = (CancellationToken ct) => CreateNewConnection(host, port, useSsl, user, pass, ct);
var connectionPool = CreateNewConnectionPool(connections, createNewConnection);
var connectionPool = BuildConnectionPool(configManager.GetUsenetProviders());
var multiConnectionClient = new MultiConnectionNntpClient(connectionPool);
var cache = new MemoryCache(new MemoryCacheOptions() { SizeLimit = 8192 });
_client = new CachingNntpClient(multiConnectionClient, cache);
Expand All @@ -43,17 +34,11 @@ public UsenetStreamingClient(ConfigManager configManager, WebsocketManager webso
!configEventArgs.ChangedConfig.ContainsKey("usenet.use-ssl") &&
!configEventArgs.ChangedConfig.ContainsKey("usenet.user") &&
!configEventArgs.ChangedConfig.ContainsKey("usenet.pass") &&
!configEventArgs.ChangedConfig.ContainsKey("usenet.connections")) return;

// update the connection-pool according to the new config
var connectionCount = int.Parse(configEventArgs.NewConfig["usenet.connections"]);
var newHost = configEventArgs.NewConfig["usenet.host"];
var newPort = int.Parse(configEventArgs.NewConfig["usenet.port"]);
var newUseSsl = bool.Parse(configEventArgs.NewConfig.GetValueOrDefault("usenet.use-ssl", "false"));
var newUser = configEventArgs.NewConfig["usenet.user"];
var newPass = configEventArgs.NewConfig["usenet.pass"];
var newConnectionPool = CreateNewConnectionPool(connectionCount, cancellationToken =>
CreateNewConnection(newHost, newPort, newUseSsl, newUser, newPass, cancellationToken));
!configEventArgs.ChangedConfig.ContainsKey("usenet.connections") &&
!configEventArgs.ChangedConfig.ContainsKey("usenet.providers")) return;

var providers = configManager.GetUsenetProviders();
var newConnectionPool = BuildConnectionPool(providers);
multiConnectionClient.UpdateConnectionPool(newConnectionPool);
};
}
Expand Down Expand Up @@ -123,6 +108,13 @@ private void OnConnectionPoolChanged(object? _, ConnectionPool<INntpClient>.Conn
_websocketManager.SendMessage(WebsocketTopic.UsenetConnections, message);
}

private ConnectionPool<INntpClient> BuildConnectionPool(IReadOnlyList<UsenetProviderConfig> providers)
{
var allocator = new UsenetProviderConnectionAllocator(providers);
var maxConnections = Math.Max(allocator.TotalConnections, 1);
return CreateNewConnectionPool(maxConnections, allocator.CreateConnectionAsync);
}

public static async ValueTask<INntpClient> CreateNewConnection
(
string host,
Expand Down
Loading