Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Production #470

Merged
merged 11 commits into from
Jan 28, 2024
4 changes: 4 additions & 0 deletions ClassTranscribeDatabase/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class AppSettings

public string RABBITMQ_PORT { get; set; } = "5672";

public string RABBITMQ_AUTOMATIC_RECOVERY = "true";
public string RABBITMQ_HEARTBEAT_SECONDS = "60";


// RABBITMQ_PREFETCH_COUNT has been replaced with these CONCURRENT LIMITS-
//no longer used g RABBITMQ_PREFETCH_COUNT { get; set; } // No longer used; can be deleted in next cleanup
public string MAX_CONCURRENT_TRANSCRIPTIONS { get; set; }
Expand Down
19 changes: 12 additions & 7 deletions ClassTranscribeDatabase/Seed.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using ClassTranscribeDatabase.Models;
using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using ClassTranscribeDatabase.Models;

namespace ClassTranscribeDatabase
{
Expand Down Expand Up @@ -58,7 +60,10 @@ public void Seed()
if (attempt < maxAttempts)
{
_logger.LogInformation($"Sleeping for {retrySeconds} seconds");
Thread.Sleep(1000 * retrySeconds);
// Thread.Sleep(1000 * retrySeconds);

Task.Delay(TimeSpan.FromSeconds(retrySeconds)).Wait();

}
else
{
Expand Down
9 changes: 6 additions & 3 deletions ClassTranscribeDatabase/Services/RabbitMQConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ private void CreateSharedConnection()
ConnectionRefCount++;
return;
}
Logger.LogInformation("Creating RabbitMQ connection");
var recovery = Convert.ToBoolean(Globals.appSettings.RABBITMQ_AUTOMATIC_RECOVERY);
var heartbeat = Convert.ToUInt32(Globals.appSettings.RABBITMQ_HEARTBEAT_SECONDS);
Logger.LogInformation($"Creating RabbitMQ connection recovery:{recovery} heartbeat:{heartbeat}");
var factory = new ConnectionFactory()
{

HostName = Globals.appSettings.RABBITMQ_SERVER_NAME.Length > 0 ? Globals.appSettings.RABBITMQ_SERVER_NAME : Globals.appSettings.RabbitMQServer,
UserName = Globals.appSettings.ADMIN_USER_ID,
Password = Globals.appSettings.ADMIN_PASSWORD,
Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT) // 5672

Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT), // 5672
AutomaticRecoveryEnabled = recovery,
RequestedHeartbeat = TimeSpan.FromSeconds(heartbeat)
};
// A developer may still want to checkout old code which uses the old env branch
// so just complain loudly for now
Expand Down
4 changes: 2 additions & 2 deletions ClassTranscribeServer/Controllers/CaptionsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public async Task<ActionResult<Caption>> PostCaption(Caption modifiedCaption)
};
_context.Captions.Add(newCaption);
await _context.SaveChangesAsync();
_wakeDownloader.UpdateVTTFile(oldCaption.TranscriptionId);
// nope _wakeDownloader.UpdateVTTFile(oldCaption.TranscriptionId);
return newCaption;
}

Expand Down Expand Up @@ -337,7 +337,7 @@ public async Task<ActionResult<IEnumerable<Caption>>> PostCaptionFile(IFormFile
await _context.Captions.AddRangeAsync(captions);
await _context.SaveChangesAsync();

_wakeDownloader.UpdateVTTFile(transcription.Id);
//nope _wakeDownloader.UpdateVTTFile(transcription.Id);

return captions;
}
Expand Down
18 changes: 9 additions & 9 deletions ClassTranscribeServer/Utils/WakeDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public virtual void UpdatePlaylist(string playlistId)
Wake(msg);
}

public virtual void UpdateVTTFile(string transcriptionId)
{
JObject msg = new JObject
{
{ "Type", TaskType.GenerateVTTFile.ToString() },
{ "TranscriptionId", transcriptionId }
};
Wake(msg);
}
// public virtual void UpdateVTTFile(string transcriptionId)
// {
// JObject msg = new JObject
// {
// { "Type", TaskType.GenerateVTTFile.ToString() },
// { "TranscriptionId", transcriptionId }
// };
// Wake(msg);
// }

public void UpdateOffering(string offeringId)
{
Expand Down
4 changes: 2 additions & 2 deletions PythonRpcServer/kaltura.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
KALTURA_PARTNER_ID = int(os.getenv('KALTURA_PARTNER_ID', default=0))
KALTURA_TOKEN_ID = os.getenv('KALTURA_TOKEN_ID', default=None)
KATLURA_APP_TOKEN = os.getenv('KALTURA_APP_TOKEN', default=None)

KALTURA_TIMEOUT= int( os.getenv('KALTURA_TIMEOUT', default=30))

# Examples of Playlists URLs the user is likely to see-
# Playlist 1_eilnj5er is Angrave's short set of example vidos
Expand Down Expand Up @@ -181,7 +181,7 @@ def getMediaInfosForKalturaChannel(self, partnerInfo, channelId):
return self.getSensibleMediaInfos(res)

def downloadLecture(self, url):
filePath, extension = download_file(url)
filePath, extension = download_file(url, timeout=KALTURA_TIMEOUT)
return filePath, extension

#Exxpects
Expand Down
4 changes: 2 additions & 2 deletions PythonRpcServer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ def extension_from_magic_bytes(filepath):
# Filepath and cookies may be specified
# Returns a two tuple, [filepath, extension]
# An appropriate Extension is guessed based on the mimetype in the 'content-type' response header
def download_file(url, filepath=None, cookies=None):
def download_file(url, filepath=None, cookies=None, timeout=60):
# NOTE the stream=True parameter below
if not filepath:
filepath = getTmpFile()
extension = None
with requests.get(url, stream=True, allow_redirects=True, cookies=cookies) as r:
with requests.get(url, stream=True, allow_redirects=True, cookies=cookies, timeout=timeout) as r:


r.raise_for_status()
Expand Down
165 changes: 91 additions & 74 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using TaskEngine.Tasks;
using System.Threading;

using Newtonsoft.Json.Linq;

using static ClassTranscribeDatabase.CommonUtils;
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;

using TaskEngine.Tasks;

namespace TaskEngine
{
Expand All @@ -29,14 +33,32 @@ class Program
public static ServiceProvider _serviceProvider;
public static ILogger<Program> _logger;
public static void Main()
{
Console.WriteLine("TaskEngine.Main starting up -GetConfigurations...");
try {
SetupServices(); // should never return
createTaskQueues();
runQueueAwakerForever();

} catch (Exception e) {
// Some paranoia here; we *should* have a logger and exception handler in place
// So this is only here to catch unexpected startup errors that otherwise might be silent
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
}
public static void SetupServices()
{
var configuration = CTDbContext.GetConfigurations();

// This project relies on Dependency Injection to configure its various services,
// For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1
// All the services used are configured using the service provider.
Console.WriteLine("SetupServices() - starting");

var serviceProvider = new ServiceCollection()
_serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
Expand Down Expand Up @@ -80,10 +102,9 @@ public static void Main()
.AddSingleton<TempCode>()
.BuildServiceProvider();

_serviceProvider = serviceProvider;
_logger = serviceProvider.GetRequiredService<ILogger<Program>>();
_logger = _serviceProvider.GetRequiredService<ILogger<Program>>();

Globals.appSettings = serviceProvider.GetService<IOptions<AppSettings>>().Value;
Globals.appSettings = _serviceProvider.GetService<IOptions<AppSettings>>().Value;
TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings);

AppDomain currentDomain = AppDomain.CurrentDomain;
Expand All @@ -92,112 +113,108 @@ public static void Main()
_logger.LogInformation("Seeding database");

// Seed the database, with some initial data.
Seeder seeder = serviceProvider.GetService<Seeder>();
Seeder seeder = _serviceProvider.GetService<Seeder>();
seeder.Seed();
}

_logger.LogInformation("Starting TaskEngine");

static void runQueueAwakerForever() {
_logger.LogInformation("runQueueAwakerForever - start");
QueueAwakerTask queueAwakerTask = _serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

// Thread.Sleep(initialPauseInterval);
Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
try {
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
} catch (Exception e) {
_logger.LogError(e, "Error in Periodic Check");
}
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
_logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck);
};
}
static void createTaskQueues() {
_logger.LogInformation("createTaskQueues() -starting");
// Delete any pre-existing queues on rabbitMQ.
RabbitMQConnection rabbitMQ = serviceProvider.GetService<RabbitMQConnection>();
RabbitMQConnection rabbitMQ = _serviceProvider.GetService<RabbitMQConnection>();

// Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed

ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY);
ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY);
ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY);
ushort concurrent_describe_images = 1;
ushort concurrent_describe_videos = 1;

ushort concurrent_describe_images = NO_CONCURRENCY;
ushort concurrent_describe_videos = NO_CONCURRENCY;

// Create and start consuming from all queues. If concurrency >=1 the queues are purged


// Upstream Sync related
_logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} ");
serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);

// Transcription Related
_logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} ");

serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);
_serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);

// no more! - serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);
// no more! - _serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);

// Video Processing Related
_logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} ");
serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
_serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
// Descriptions
serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);


_serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
_serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);

// SceneDetection now handled by native Python
// See https://github.com/classtranscribe/pyapi
serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);

// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);

// Elastic Search index should be built after TranscriptionTask
serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);

// Outdated Elastic Search index would be removed
serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);

_logger.LogInformation("Done creating task consumers");
//nolonger used :
// nope serviceProvider.GetService<nope ConvertVideoToWavTask>().Consume(concurrent_videotasks);

bool hacktest = false;
if (hacktest)
{
TempCode tempCode = serviceProvider.GetService<TempCode>();
tempCode.Temp();
return;
}
_logger.LogInformation("All done!");

QueueAwakerTask queueAwakerTask = serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

Thread.Sleep(initialPauseInterval);

// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
Thread.Sleep(timeInterval);
};
_serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);
_logger.LogInformation("createTaskQueues() - Done creating task consumers");
}

// Catch all unhandled exceptions.
static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
_logger.LogError(e, "Unhandled Exception Caught");
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}

private static ushort ToUInt16(String val, ushort defaultVal)
Expand Down
Loading
Loading