Skip to content

Commit

Permalink
Add more logging to Taskengine + refactor main
Browse files Browse the repository at this point in the history
  • Loading branch information
angrave committed Jan 28, 2024
1 parent fda94dd commit ac632e8
Showing 1 changed file with 81 additions and 70 deletions.
151 changes: 81 additions & 70 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,32 @@ class Program
public static ServiceProvider _serviceProvider;
public static ILogger<Program> _logger;
public static void Main()
{
Console.WriteLine("TaskEngine.Main starting up -GetConfigurations...");
try {
SetupServices(); // should never return
createTaskQueues();
runQueueAwakerForever();

} catch (Exception e) {
// Some paranoia here; we *should* have a logger and exception handler in place
// So this is only here to catch unexpected startup errors that otherwise might be silent
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
}
public static void SetupServices()
{
var configuration = CTDbContext.GetConfigurations();

// This project relies on Dependency Injection to configure its various services,
// For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1
// All the services used are configured using the service provider.
Console.WriteLine("SetupServices() - starting");

var serviceProvider = new ServiceCollection()
_serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
Expand Down Expand Up @@ -84,10 +102,9 @@ public static void Main()
.AddSingleton<TempCode>()
.BuildServiceProvider();

_serviceProvider = serviceProvider;
_logger = serviceProvider.GetRequiredService<ILogger<Program>>();
_logger = _serviceProvider.GetRequiredService<ILogger<Program>>();

Globals.appSettings = serviceProvider.GetService<IOptions<AppSettings>>().Value;
Globals.appSettings = _serviceProvider.GetService<IOptions<AppSettings>>().Value;
TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings);

AppDomain currentDomain = AppDomain.CurrentDomain;
Expand All @@ -96,114 +113,108 @@ public static void Main()
_logger.LogInformation("Seeding database");

// Seed the database, with some initial data.
Seeder seeder = serviceProvider.GetService<Seeder>();
Seeder seeder = _serviceProvider.GetService<Seeder>();
seeder.Seed();
}

_logger.LogInformation("Starting TaskEngine");

static void runQueueAwakerForever() {
_logger.LogInformation("runQueueAwakerForever - start");
QueueAwakerTask queueAwakerTask = _serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

// Thread.Sleep(initialPauseInterval);
Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
try {
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
} catch (Exception e) {
_logger.LogError(e, "Error in Periodic Check");
}
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
_logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck);
};
}
static void createTaskQueues() {
_logger.LogInformation("createTaskQueues() -starting");
// Delete any pre-existing queues on rabbitMQ.
RabbitMQConnection rabbitMQ = serviceProvider.GetService<RabbitMQConnection>();
RabbitMQConnection rabbitMQ = _serviceProvider.GetService<RabbitMQConnection>();

// Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed

ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY);
ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY);
ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY);
ushort concurrent_describe_images = 1;
ushort concurrent_describe_videos = 1;

ushort concurrent_describe_images = NO_CONCURRENCY;
ushort concurrent_describe_videos = NO_CONCURRENCY;

// Create and start consuming from all queues. If concurrency >=1 the queues are purged


// Upstream Sync related
_logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} ");
serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);

// Transcription Related
_logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} ");

serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);
_serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);

// no more! - serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);
// no more! - _serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);

// Video Processing Related
_logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} ");
serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
_serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
// Descriptions
serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);


_serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
_serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);

// SceneDetection now handled by native Python
// See https://github.com/classtranscribe/pyapi
serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);

// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);

// Elastic Search index should be built after TranscriptionTask
serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);

// Outdated Elastic Search index would be removed
serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);

_logger.LogInformation("Done creating task consumers");
//nolonger used :
// nope serviceProvider.GetService<nope ConvertVideoToWavTask>().Consume(concurrent_videotasks);

bool hacktest = false;
if (hacktest)
{
TempCode tempCode = serviceProvider.GetService<TempCode>();
tempCode.Temp();
return;
}
_logger.LogInformation("All done!");

QueueAwakerTask queueAwakerTask = serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

// Thread.Sleep(initialPauseInterval);
Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{

queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
};
_serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);
_logger.LogInformation("createTaskQueues() - Done creating task consumers");
}

// Catch all unhandled exceptions.
static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
_logger.LogError(e, "Unhandled Exception Caught");
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}

private static ushort ToUInt16(String val, ushort defaultVal)
Expand Down

0 comments on commit ac632e8

Please sign in to comment.