Skip to content

Commit

Permalink
Merge pull request #465 from classtranscribe/staging
Browse files Browse the repository at this point in the history
Push to Production
  • Loading branch information
angrave authored Jan 27, 2024
2 parents 2c8d099 + 9dd1712 commit dba6dca
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 34 deletions.
2 changes: 2 additions & 0 deletions ClassTranscribeDatabase/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class AppSettings
public string BUILDNUMBER { get; set; }
public string ES_CONNECTION_ADDR { get; set; }

public string INITIAL_TASKENGINE_PAUSE_MINUTES { get; set; } = "2";

// We let the message expire - which is likely if the server is overloaded- the periodic check will rediscover things to do later
// The suggested value is PERIODIC_CHECK_EVERY_MINUTES -5
// i.e we assume that the periodic check takes no more than 5 minutes to enqueue a task
Expand Down
90 changes: 61 additions & 29 deletions ClassTranscribeDatabase/Services/BoxAPI.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
using Box.V2;
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using Box.V2;
using Box.V2.Auth;
using Box.V2.Config;

using ClassTranscribeDatabase.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Threading.Tasks;


namespace ClassTranscribeDatabase.Services
{
public class BoxAPI
{
private readonly SlackLogger _slack;
private readonly ILogger _logger;
private BoxClient? _boxClient;

Check warning on line 22 in ClassTranscribeDatabase/Services/BoxAPI.cs

View workflow job for this annotation

GitHub Actions / Build

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
private DateTimeOffset _lastRefreshed = DateTimeOffset.MinValue;
private SemaphoreSlim _RefreshSemaphore = new SemaphoreSlim(1, 1); // async-safe mutex to ensure only one thread is refreshing the token at a time

public BoxAPI(ILogger<BoxAPI> logger, SlackLogger slack)
{
_logger = logger;
Expand All @@ -30,6 +39,7 @@ public async Task CreateAccessTokenAsync(string authCode)
// This implementation is overly chatty with the database, but we rarely create access tokens so it is not a problem
using (var _context = CTDbContext.CreateDbContext())
{

if (!await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).AnyAsync())
{
_context.Dictionaries.Add(new Dictionary
Expand All @@ -47,13 +57,15 @@ public async Task CreateAccessTokenAsync(string authCode)
await _context.SaveChangesAsync();
}


var accessToken = _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).First();
var refreshToken = _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_REFRESH_TOKEN).First();
var config = new BoxConfig(Globals.appSettings.BOX_CLIENT_ID, Globals.appSettings.BOX_CLIENT_SECRET, new Uri("http://locahost"));
var client = new Box.V2.BoxClient(config);
var auth = await client.Auth.AuthenticateAsync(authCode);
_logger.LogInformation("Created Box Tokens");
var tmpClient = new Box.V2.BoxClient(config);
var auth = await tmpClient.Auth.AuthenticateAsync(authCode);

_logger.LogInformation($"Created Box Tokens Access:({auth.AccessToken.Substring(0, 5)}) Refresh({auth.RefreshToken.Substring(0, 5)})");

accessToken.Value = auth.AccessToken;
refreshToken.Value = auth.RefreshToken;
await _context.SaveChangesAsync();
Expand All @@ -62,31 +74,37 @@ public async Task CreateAccessTokenAsync(string authCode)
/// <summary>
/// Updates the accessToken and refreshToken. These keys must already exist in the Dictionary table.
/// </summary>
public async Task RefreshAccessTokenAsync()
private async Task<BoxClient> RefreshAccessTokenAsync()
{
// Only one thread should call this at a time (see semaphore in GetBoxClientAsync)
try
{
_logger.LogInformation($"RefreshAccessTokenAsync: Starting");
using (var _context = CTDbContext.CreateDbContext())
{
var accessToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).FirstAsync();
var refreshToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_REFRESH_TOKEN).FirstAsync();
var config = new BoxConfig(Globals.appSettings.BOX_CLIENT_ID, Globals.appSettings.BOX_CLIENT_SECRET, new Uri("http://locahost"));
var auth = new OAuthSession(accessToken.Value, refreshToken.Value, 3600, "bearer");
var client = new BoxClient(config, auth);
/// Try to refresh the access token
auth = await client.Auth.RefreshAccessTokenAsync(auth.AccessToken);
var initialAuth = new OAuthSession(accessToken.Value, refreshToken.Value, 3600, "bearer");
var initialClient = new BoxClient(config, initialAuth);
/// Refresh the access token
var auth = await initialClient.Auth.RefreshAccessTokenAsync(initialAuth.AccessToken);
/// Create the client again
client = new BoxClient(config, auth);
_logger.LogInformation("Refreshed Tokens");
_logger.LogInformation($"RefreshAccessTokenAsync: New Access Token ({auth.AccessToken.Substring(0, 5)}), New Refresh Token ({auth.RefreshToken.Substring(0, 5)})");

accessToken.Value = auth.AccessToken;
refreshToken.Value = auth.RefreshToken;
_lastRefreshed = DateTimeOffset.Now;
await _context.SaveChangesAsync();
_logger.LogInformation($"RefreshAccessTokenAsync: Creating New Box Client");
var client = new BoxClient(config, auth);
return client;
}
}
catch (Box.V2.Exceptions.BoxSessionInvalidatedException e)
{
_logger.LogError(e, "Box Token Failure.");
await _slack.PostErrorAsync(e, "Box Token Failure.");
_logger.LogError(e, "RefreshAccessTokenAsync: Box Token Failure.");
await _slack.PostErrorAsync(e, "RefreshAccessTokenAsync: Box Token Failure.");
throw;
}
}
Expand All @@ -95,18 +113,32 @@ public async Task RefreshAccessTokenAsync()
/// </summary>
public async Task<BoxClient> GetBoxClientAsync()
{
// Todo RefreshAccessTokenAsync could return this information for us; and avoid another trip to the database
await RefreshAccessTokenAsync();
BoxClient boxClient;
using (var _context = CTDbContext.CreateDbContext())
try
{
var accessToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_ACCESS_TOKEN).FirstAsync();
var refreshToken = await _context.Dictionaries.Where(d => d.Key == CommonUtils.BOX_REFRESH_TOKEN).FirstAsync();
var config = new BoxConfig(Globals.appSettings.BOX_CLIENT_ID, Globals.appSettings.BOX_CLIENT_SECRET, new Uri("http://locahost"));
var auth = new OAuthSession(accessToken.Value, refreshToken.Value, 3600, "bearer");
boxClient = new Box.V2.BoxClient(config, auth);
await _RefreshSemaphore.WaitAsync(); // // critical section : implementation of an async-safe mutex
var MAX_AGE_MINUTES = 50;
var remain = DateTimeOffset.Now.Subtract(_lastRefreshed).TotalMinutes;
_logger.LogInformation($"GetBoxClientAsync: {remain} minutes since last refresh. Max age {MAX_AGE_MINUTES}.");
if (_boxClient != null && remain < MAX_AGE_MINUTES)
{
return _boxClient;
}
_boxClient = await RefreshAccessTokenAsync();
_logger.LogInformation($"GetBoxClientAsync: _boxClient updated");
}
catch (Exception e)
{
_logger.LogError(e, "GetBoxClientAsync: Box Refresh Failure.");
throw;
}
return boxClient;
finally
{
_logger.LogInformation($"GetBoxClientAsync: Releasing Semaphore and returning");
_RefreshSemaphore.Release(1);
}

return _boxClient;
}

}
}
11 changes: 6 additions & 5 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static void Main()
.AddSingleton<PythonCrawlerTask>()
.AddSingleton<DescribeVideoTask>()
.AddSingleton<DescribeImageTask>()
.AddSingleton<UpdateBoxTokenTask>()
// .AddSingleton<UpdateBoxTokenTask>()
.AddSingleton<CreateBoxTokenTask>()
.AddSingleton<BuildElasticIndexTask>()
.AddSingleton<ExampleTask>()
Expand Down Expand Up @@ -141,8 +141,8 @@ public static void Main()
// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY);
// does nothing at the moment serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);

// Elastic Search index should be built after TranscriptionTask
serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
Expand Down Expand Up @@ -170,11 +170,12 @@ public static void Main()
QueueAwakerTask queueAwakerTask = serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));

int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, 2, 0);
var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

Thread.Sleep(initialPauseInterval);
Expand Down

0 comments on commit dba6dca

Please sign in to comment.