From fae1d032a0ba35442f26c323ed0f6232eef04d9f Mon Sep 17 00:00:00 2001 From: Lawrence Angrave Date: Fri, 26 Jan 2024 16:32:55 -0600 Subject: [PATCH 1/2] Configurable Taskengine initial pause --- ClassTranscribeDatabase/Globals.cs | 2 ++ TaskEngine/Program.cs | 11 ++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ClassTranscribeDatabase/Globals.cs b/ClassTranscribeDatabase/Globals.cs index 48e49ed..3bdbcca 100644 --- a/ClassTranscribeDatabase/Globals.cs +++ b/ClassTranscribeDatabase/Globals.cs @@ -53,6 +53,8 @@ public class AppSettings public string BUILDNUMBER { get; set; } public string ES_CONNECTION_ADDR { get; set; } + public string INITIAL_TASKENGINE_PAUSE_MINUTES { get; set; } = "2"; + // We let the message expire - which is likely if the server is overloaded- the periodic check will rediscover things to do later // The suggested value is PERIODIC_CHECK_EVERY_MINUTES -5 // i.e we assume that the periodic check takes no more than 5 minutes to enqueue a task diff --git a/TaskEngine/Program.cs b/TaskEngine/Program.cs index 6588465..4c85cff 100644 --- a/TaskEngine/Program.cs +++ b/TaskEngine/Program.cs @@ -69,7 +69,7 @@ public static void Main() .AddSingleton() .AddSingleton() .AddSingleton() - .AddSingleton() + // .AddSingleton() .AddSingleton() .AddSingleton() .AddSingleton() @@ -141,8 +141,8 @@ public static void Main() // We dont want concurrency for these tasks _logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers."); serviceProvider.GetService().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY? - serviceProvider.GetService().Consume(NO_CONCURRENCY); - serviceProvider.GetService().Consume(NO_CONCURRENCY); + // does nothing at the moment serviceProvider.GetService().Consume(NO_CONCURRENCY); + serviceProvider.GetService().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode); // Elastic Search index should be built after TranscriptionTask serviceProvider.GetService().Consume(NO_CONCURRENCY); @@ -170,11 +170,12 @@ public static void Main() QueueAwakerTask queueAwakerTask = serviceProvider.GetService(); int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES)); - + int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES)); + _logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck); var timeInterval = new TimeSpan(0, periodicCheck, 0); - var initialPauseInterval = new TimeSpan(0, 2, 0); + var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0); _logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval); Thread.Sleep(initialPauseInterval); From ac57d33989ad69d973fab2d84e83619439fc23e5 Mon Sep 17 00:00:00 2001 From: Lawrence Angrave Date: Sat, 27 Jan 2024 15:31:54 -0600 Subject: [PATCH 2/2] Shared Box Client --- ClassTranscribeDatabase/Services/BoxAPI.cs | 90 +++++++++++++++------- 1 file changed, 61 insertions(+), 29 deletions(-) diff --git a/ClassTranscribeDatabase/Services/BoxAPI.cs b/ClassTranscribeDatabase/Services/BoxAPI.cs index 4b6abb0..26fe8fc 100644 --- a/ClassTranscribeDatabase/Services/BoxAPI.cs +++ b/ClassTranscribeDatabase/Services/BoxAPI.cs @@ -1,12 +1,17 @@ -using Box.V2; +using System; +using System.Linq; +using System.Threading.Tasks; +using System.Threading; + +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +using Box.V2; using Box.V2.Auth; using Box.V2.Config; + using ClassTranscribeDatabase.Models; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; -using System; -using System.Linq; -using System.Threading.Tasks; + namespace ClassTranscribeDatabase.Services { @@ -14,6 +19,10 @@ public class BoxAPI { private readonly SlackLogger _slack; private readonly ILogger _logger; + private BoxClient? _boxClient; + private DateTimeOffset _lastRefreshed = DateTimeOffset.MinValue; + private SemaphoreSlim _RefreshSemaphore = new SemaphoreSlim(1, 1); // async-safe mutex to ensure only one thread is refreshing the token at a time + public BoxAPI(ILogger logger, SlackLogger slack) { _logger = logger; @@ -30,6 +39,7 @@ public async Task CreateAccessTokenAsync(string authCode) // This implementation is overly chatty with the database, but we rarely create access tokens so it is not a problem using (var _context = CTDbContext.CreateDbContext()) { + if (!await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).AnyAsync()) { _context.Dictionaries.Add(new Dictionary @@ -47,13 +57,15 @@ public async Task CreateAccessTokenAsync(string authCode) await _context.SaveChangesAsync(); } - + var accessToken = _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).First(); var refreshToken = _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_REFRESH_TOKEN).First(); var config = new BoxConfig(Globals.appSettings.BOX_CLIENT_ID, Globals.appSettings.BOX_CLIENT_SECRET, new Uri("http://locahost")); - var client = new Box.V2.BoxClient(config); - var auth = await client.Auth.AuthenticateAsync(authCode); - _logger.LogInformation("Created Box Tokens"); + var tmpClient = new Box.V2.BoxClient(config); + var auth = await tmpClient.Auth.AuthenticateAsync(authCode); + + _logger.LogInformation($"Created Box Tokens Access:({auth.AccessToken.Substring(0, 5)}) Refresh({auth.RefreshToken.Substring(0, 5)})"); + accessToken.Value = auth.AccessToken; refreshToken.Value = auth.RefreshToken; await _context.SaveChangesAsync(); @@ -62,31 +74,37 @@ public async Task CreateAccessTokenAsync(string authCode) /// /// Updates the accessToken and refreshToken. These keys must already exist in the Dictionary table. /// - public async Task RefreshAccessTokenAsync() + private async Task RefreshAccessTokenAsync() { + // Only one thread should call this at a time (see semaphore in GetBoxClientAsync) try { + _logger.LogInformation($"RefreshAccessTokenAsync: Starting"); using (var _context = CTDbContext.CreateDbContext()) { var accessToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).FirstAsync(); var refreshToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_REFRESH_TOKEN).FirstAsync(); var config = new BoxConfig(Globals.appSettings.BOX_CLIENT_ID, Globals.appSettings.BOX_CLIENT_SECRET, new Uri("http://locahost")); - var auth = new OAuthSession(accessToken.Value, refreshToken.Value, 3600, "bearer"); - var client = new BoxClient(config, auth); - /// Try to refresh the access token - auth = await client.Auth.RefreshAccessTokenAsync(auth.AccessToken); + var initialAuth = new OAuthSession(accessToken.Value, refreshToken.Value, 3600, "bearer"); + var initialClient = new BoxClient(config, initialAuth); + /// Refresh the access token + var auth = await initialClient.Auth.RefreshAccessTokenAsync(initialAuth.AccessToken); /// Create the client again - client = new BoxClient(config, auth); - _logger.LogInformation("Refreshed Tokens"); + _logger.LogInformation($"RefreshAccessTokenAsync: New Access Token ({auth.AccessToken.Substring(0, 5)}), New Refresh Token ({auth.RefreshToken.Substring(0, 5)})"); + accessToken.Value = auth.AccessToken; refreshToken.Value = auth.RefreshToken; + _lastRefreshed = DateTimeOffset.Now; await _context.SaveChangesAsync(); + _logger.LogInformation($"RefreshAccessTokenAsync: Creating New Box Client"); + var client = new BoxClient(config, auth); + return client; } } catch (Box.V2.Exceptions.BoxSessionInvalidatedException e) { - _logger.LogError(e, "Box Token Failure."); - await _slack.PostErrorAsync(e, "Box Token Failure."); + _logger.LogError(e, "RefreshAccessTokenAsync: Box Token Failure."); + await _slack.PostErrorAsync(e, "RefreshAccessTokenAsync: Box Token Failure."); throw; } } @@ -95,18 +113,32 @@ public async Task RefreshAccessTokenAsync() /// public async Task GetBoxClientAsync() { - // Todo RefreshAccessTokenAsync could return this information for us; and avoid another trip to the database - await RefreshAccessTokenAsync(); - BoxClient boxClient; - using (var _context = CTDbContext.CreateDbContext()) + try { - var accessToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).FirstAsync(); - var refreshToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_REFRESH_TOKEN).FirstAsync(); - var config = new BoxConfig(Globals.appSettings.BOX_CLIENT_ID, Globals.appSettings.BOX_CLIENT_SECRET, new Uri("http://locahost")); - var auth = new OAuthSession(accessToken.Value, refreshToken.Value, 3600, "bearer"); - boxClient = new Box.V2.BoxClient(config, auth); + await _RefreshSemaphore.WaitAsync(); // // critical section : implementation of an async-safe mutex + var MAX_AGE_MINUTES = 50; + var remain = DateTimeOffset.Now.Subtract(_lastRefreshed).TotalMinutes; + _logger.LogInformation($"GetBoxClientAsync: {remain} minutes since last refresh. Max age {MAX_AGE_MINUTES}."); + if (_boxClient != null && remain < MAX_AGE_MINUTES) + { + return _boxClient; + } + _boxClient = await RefreshAccessTokenAsync(); + _logger.LogInformation($"GetBoxClientAsync: _boxClient updated"); + } + catch (Exception e) + { + _logger.LogError(e, "GetBoxClientAsync: Box Refresh Failure."); + throw; } - return boxClient; + finally + { + _logger.LogInformation($"GetBoxClientAsync: Releasing Semaphore and returning"); + _RefreshSemaphore.Release(1); + } + + return _boxClient; } + } }