Skip to content

Commit

Permalink
Merge pull request #471 from classtranscribe/Update-RabbitMQ
Browse files Browse the repository at this point in the history
Update RabbitMQ Config options
  • Loading branch information
angrave authored Jan 28, 2024
2 parents fa1a49d + ac632e8 commit e3afbee
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 84 deletions.
4 changes: 4 additions & 0 deletions ClassTranscribeDatabase/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class AppSettings

public string RABBITMQ_PORT { get; set; } = "5672";

public string RABBITMQ_AUTOMATIC_RECOVERY = "true";
public string RABBITMQ_HEARTBEAT_SECONDS = "60";


// RABBITMQ_PREFETCH_COUNT has been replaced with these CONCURRENT LIMITS-
//no longer used g RABBITMQ_PREFETCH_COUNT { get; set; } // No longer used; can be deleted in next cleanup
public string MAX_CONCURRENT_TRANSCRIPTIONS { get; set; }
Expand Down
19 changes: 12 additions & 7 deletions ClassTranscribeDatabase/Seed.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using ClassTranscribeDatabase.Models;
using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using ClassTranscribeDatabase.Models;

namespace ClassTranscribeDatabase
{
Expand Down Expand Up @@ -58,7 +60,10 @@ public void Seed()
if (attempt < maxAttempts)
{
_logger.LogInformation($"Sleeping for {retrySeconds} seconds");
Thread.Sleep(1000 * retrySeconds);
// Thread.Sleep(1000 * retrySeconds);

Task.Delay(TimeSpan.FromSeconds(retrySeconds)).Wait();

}
else
{
Expand Down
9 changes: 6 additions & 3 deletions ClassTranscribeDatabase/Services/RabbitMQConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ private void CreateSharedConnection()
ConnectionRefCount++;
return;
}
Logger.LogInformation("Creating RabbitMQ connection");
var recovery = Convert.ToBoolean(Globals.appSettings.RABBITMQ_AUTOMATIC_RECOVERY);
var heartbeat = Convert.ToUInt32(Globals.appSettings.RABBITMQ_HEARTBEAT_SECONDS);
Logger.LogInformation($"Creating RabbitMQ connection recovery:{recovery} heartbeat:{heartbeat}");
var factory = new ConnectionFactory()
{

HostName = Globals.appSettings.RABBITMQ_SERVER_NAME.Length > 0 ? Globals.appSettings.RABBITMQ_SERVER_NAME : Globals.appSettings.RabbitMQServer,
UserName = Globals.appSettings.ADMIN_USER_ID,
Password = Globals.appSettings.ADMIN_PASSWORD,
Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT) // 5672

Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT), // 5672
AutomaticRecoveryEnabled = recovery,
RequestedHeartbeat = TimeSpan.FromSeconds(heartbeat)
};
// A developer may still want to checkout old code which uses the old env branch
// so just complain loudly for now
Expand Down
165 changes: 91 additions & 74 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using TaskEngine.Tasks;
using System.Threading;

using Newtonsoft.Json.Linq;

using static ClassTranscribeDatabase.CommonUtils;
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;

using TaskEngine.Tasks;

namespace TaskEngine
{
Expand All @@ -29,14 +33,32 @@ class Program
public static ServiceProvider _serviceProvider;
public static ILogger<Program> _logger;
public static void Main()
{
Console.WriteLine("TaskEngine.Main starting up -GetConfigurations...");
try {
SetupServices(); // should never return
createTaskQueues();
runQueueAwakerForever();

} catch (Exception e) {
// Some paranoia here; we *should* have a logger and exception handler in place
// So this is only here to catch unexpected startup errors that otherwise might be silent
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
}
public static void SetupServices()
{
var configuration = CTDbContext.GetConfigurations();

// This project relies on Dependency Injection to configure its various services,
// For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1
// All the services used are configured using the service provider.
Console.WriteLine("SetupServices() - starting");

var serviceProvider = new ServiceCollection()
_serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
Expand Down Expand Up @@ -80,10 +102,9 @@ public static void Main()
.AddSingleton<TempCode>()
.BuildServiceProvider();

_serviceProvider = serviceProvider;
_logger = serviceProvider.GetRequiredService<ILogger<Program>>();
_logger = _serviceProvider.GetRequiredService<ILogger<Program>>();

Globals.appSettings = serviceProvider.GetService<IOptions<AppSettings>>().Value;
Globals.appSettings = _serviceProvider.GetService<IOptions<AppSettings>>().Value;
TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings);

AppDomain currentDomain = AppDomain.CurrentDomain;
Expand All @@ -92,112 +113,108 @@ public static void Main()
_logger.LogInformation("Seeding database");

// Seed the database, with some initial data.
Seeder seeder = serviceProvider.GetService<Seeder>();
Seeder seeder = _serviceProvider.GetService<Seeder>();
seeder.Seed();
}

_logger.LogInformation("Starting TaskEngine");

static void runQueueAwakerForever() {
_logger.LogInformation("runQueueAwakerForever - start");
QueueAwakerTask queueAwakerTask = _serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

// Thread.Sleep(initialPauseInterval);
Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
try {
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
} catch (Exception e) {
_logger.LogError(e, "Error in Periodic Check");
}
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
_logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck);
};
}
static void createTaskQueues() {
_logger.LogInformation("createTaskQueues() -starting");
// Delete any pre-existing queues on rabbitMQ.
RabbitMQConnection rabbitMQ = serviceProvider.GetService<RabbitMQConnection>();
RabbitMQConnection rabbitMQ = _serviceProvider.GetService<RabbitMQConnection>();

// Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed

ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY);
ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY);
ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY);
ushort concurrent_describe_images = 1;
ushort concurrent_describe_videos = 1;

ushort concurrent_describe_images = NO_CONCURRENCY;
ushort concurrent_describe_videos = NO_CONCURRENCY;

// Create and start consuming from all queues. If concurrency >=1 the queues are purged


// Upstream Sync related
_logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} ");
serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);

// Transcription Related
_logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} ");

serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);
_serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);

// no more! - serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);
// no more! - _serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);

// Video Processing Related
_logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} ");
serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
_serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
// Descriptions
serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);


_serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
_serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);

// SceneDetection now handled by native Python
// See https://github.com/classtranscribe/pyapi
serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);

// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);

// Elastic Search index should be built after TranscriptionTask
serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);

// Outdated Elastic Search index would be removed
serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);

_logger.LogInformation("Done creating task consumers");
//nolonger used :
// nope serviceProvider.GetService<nope ConvertVideoToWavTask>().Consume(concurrent_videotasks);

bool hacktest = false;
if (hacktest)
{
TempCode tempCode = serviceProvider.GetService<TempCode>();
tempCode.Temp();
return;
}
_logger.LogInformation("All done!");

QueueAwakerTask queueAwakerTask = serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

Thread.Sleep(initialPauseInterval);

// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
Thread.Sleep(timeInterval);
};
_serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);
_logger.LogInformation("createTaskQueues() - Done creating task consumers");
}

// Catch all unhandled exceptions.
static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
_logger.LogError(e, "Unhandled Exception Caught");
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}

private static ushort ToUInt16(String val, ushort defaultVal)
Expand Down

0 comments on commit e3afbee

Please sign in to comment.