Skip to content

Commit

Permalink
Merge pull request #470 from classtranscribe/staging
Browse files Browse the repository at this point in the history
Production
  • Loading branch information
angrave authored Jan 28, 2024
2 parents 6897d4e + e3afbee commit 09bd97c
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 101 deletions.
4 changes: 4 additions & 0 deletions ClassTranscribeDatabase/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class AppSettings

public string RABBITMQ_PORT { get; set; } = "5672";

public string RABBITMQ_AUTOMATIC_RECOVERY = "true";
public string RABBITMQ_HEARTBEAT_SECONDS = "60";


// RABBITMQ_PREFETCH_COUNT has been replaced with these CONCURRENT LIMITS-
//no longer used g RABBITMQ_PREFETCH_COUNT { get; set; } // No longer used; can be deleted in next cleanup
public string MAX_CONCURRENT_TRANSCRIPTIONS { get; set; }
Expand Down
19 changes: 12 additions & 7 deletions ClassTranscribeDatabase/Seed.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using ClassTranscribeDatabase.Models;
using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;

using ClassTranscribeDatabase.Models;

namespace ClassTranscribeDatabase
{
Expand Down Expand Up @@ -58,7 +60,10 @@ public void Seed()
if (attempt < maxAttempts)
{
_logger.LogInformation($"Sleeping for {retrySeconds} seconds");
Thread.Sleep(1000 * retrySeconds);
// Thread.Sleep(1000 * retrySeconds);

Task.Delay(TimeSpan.FromSeconds(retrySeconds)).Wait();

}
else
{
Expand Down
9 changes: 6 additions & 3 deletions ClassTranscribeDatabase/Services/RabbitMQConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ private void CreateSharedConnection()
ConnectionRefCount++;
return;
}
Logger.LogInformation("Creating RabbitMQ connection");
var recovery = Convert.ToBoolean(Globals.appSettings.RABBITMQ_AUTOMATIC_RECOVERY);
var heartbeat = Convert.ToUInt32(Globals.appSettings.RABBITMQ_HEARTBEAT_SECONDS);
Logger.LogInformation($"Creating RabbitMQ connection recovery:{recovery} heartbeat:{heartbeat}");
var factory = new ConnectionFactory()
{

HostName = Globals.appSettings.RABBITMQ_SERVER_NAME.Length > 0 ? Globals.appSettings.RABBITMQ_SERVER_NAME : Globals.appSettings.RabbitMQServer,
UserName = Globals.appSettings.ADMIN_USER_ID,
Password = Globals.appSettings.ADMIN_PASSWORD,
Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT) // 5672

Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT), // 5672
AutomaticRecoveryEnabled = recovery,
RequestedHeartbeat = TimeSpan.FromSeconds(heartbeat)
};
// A developer may still want to checkout old code which uses the old env branch
// so just complain loudly for now
Expand Down
4 changes: 2 additions & 2 deletions ClassTranscribeServer/Controllers/CaptionsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public async Task<ActionResult<Caption>> PostCaption(Caption modifiedCaption)
};
_context.Captions.Add(newCaption);
await _context.SaveChangesAsync();
_wakeDownloader.UpdateVTTFile(oldCaption.TranscriptionId);
// nope _wakeDownloader.UpdateVTTFile(oldCaption.TranscriptionId);
return newCaption;
}

Expand Down Expand Up @@ -337,7 +337,7 @@ public async Task<ActionResult<IEnumerable<Caption>>> PostCaptionFile(IFormFile
await _context.Captions.AddRangeAsync(captions);
await _context.SaveChangesAsync();

_wakeDownloader.UpdateVTTFile(transcription.Id);
//nope _wakeDownloader.UpdateVTTFile(transcription.Id);

return captions;
}
Expand Down
18 changes: 9 additions & 9 deletions ClassTranscribeServer/Utils/WakeDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public virtual void UpdatePlaylist(string playlistId)
Wake(msg);
}

public virtual void UpdateVTTFile(string transcriptionId)
{
JObject msg = new JObject
{
{ "Type", TaskType.GenerateVTTFile.ToString() },
{ "TranscriptionId", transcriptionId }
};
Wake(msg);
}
// public virtual void UpdateVTTFile(string transcriptionId)
// {
// JObject msg = new JObject
// {
// { "Type", TaskType.GenerateVTTFile.ToString() },
// { "TranscriptionId", transcriptionId }
// };
// Wake(msg);
// }

public void UpdateOffering(string offeringId)
{
Expand Down
4 changes: 2 additions & 2 deletions PythonRpcServer/kaltura.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
KALTURA_PARTNER_ID = int(os.getenv('KALTURA_PARTNER_ID', default=0))
KALTURA_TOKEN_ID = os.getenv('KALTURA_TOKEN_ID', default=None)
KATLURA_APP_TOKEN = os.getenv('KALTURA_APP_TOKEN', default=None)

KALTURA_TIMEOUT= int( os.getenv('KALTURA_TIMEOUT', default=30))

# Examples of Playlists URLs the user is likely to see-
# Playlist 1_eilnj5er is Angrave's short set of example vidos
Expand Down Expand Up @@ -181,7 +181,7 @@ def getMediaInfosForKalturaChannel(self, partnerInfo, channelId):
return self.getSensibleMediaInfos(res)

def downloadLecture(self, url):
filePath, extension = download_file(url)
filePath, extension = download_file(url, timeout=KALTURA_TIMEOUT)
return filePath, extension

#Exxpects
Expand Down
4 changes: 2 additions & 2 deletions PythonRpcServer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ def extension_from_magic_bytes(filepath):
# Filepath and cookies may be specified
# Returns a two tuple, [filepath, extension]
# An appropriate Extension is guessed based on the mimetype in the 'content-type' response header
def download_file(url, filepath=None, cookies=None):
def download_file(url, filepath=None, cookies=None, timeout=60):
# NOTE the stream=True parameter below
if not filepath:
filepath = getTmpFile()
extension = None
with requests.get(url, stream=True, allow_redirects=True, cookies=cookies) as r:
with requests.get(url, stream=True, allow_redirects=True, cookies=cookies, timeout=timeout) as r:


r.raise_for_status()
Expand Down
165 changes: 91 additions & 74 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using TaskEngine.Tasks;
using System.Threading;

using Newtonsoft.Json.Linq;

using static ClassTranscribeDatabase.CommonUtils;
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;

using TaskEngine.Tasks;

namespace TaskEngine
{
Expand All @@ -29,14 +33,32 @@ class Program
public static ServiceProvider _serviceProvider;
public static ILogger<Program> _logger;
public static void Main()
{
Console.WriteLine("TaskEngine.Main starting up -GetConfigurations...");
try {
SetupServices(); // should never return
createTaskQueues();
runQueueAwakerForever();

} catch (Exception e) {
// Some paranoia here; we *should* have a logger and exception handler in place
// So this is only here to catch unexpected startup errors that otherwise might be silent
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
}
public static void SetupServices()
{
var configuration = CTDbContext.GetConfigurations();

// This project relies on Dependency Injection to configure its various services,
// For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1
// All the services used are configured using the service provider.
Console.WriteLine("SetupServices() - starting");

var serviceProvider = new ServiceCollection()
_serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
Expand Down Expand Up @@ -80,10 +102,9 @@ public static void Main()
.AddSingleton<TempCode>()
.BuildServiceProvider();

_serviceProvider = serviceProvider;
_logger = serviceProvider.GetRequiredService<ILogger<Program>>();
_logger = _serviceProvider.GetRequiredService<ILogger<Program>>();

Globals.appSettings = serviceProvider.GetService<IOptions<AppSettings>>().Value;
Globals.appSettings = _serviceProvider.GetService<IOptions<AppSettings>>().Value;
TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings);

AppDomain currentDomain = AppDomain.CurrentDomain;
Expand All @@ -92,112 +113,108 @@ public static void Main()
_logger.LogInformation("Seeding database");

// Seed the database, with some initial data.
Seeder seeder = serviceProvider.GetService<Seeder>();
Seeder seeder = _serviceProvider.GetService<Seeder>();
seeder.Seed();
}

_logger.LogInformation("Starting TaskEngine");

static void runQueueAwakerForever() {
_logger.LogInformation("runQueueAwakerForever - start");
QueueAwakerTask queueAwakerTask = _serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

// Thread.Sleep(initialPauseInterval);
Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
try {
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
} catch (Exception e) {
_logger.LogError(e, "Error in Periodic Check");
}
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
_logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck);
};
}
static void createTaskQueues() {
_logger.LogInformation("createTaskQueues() -starting");
// Delete any pre-existing queues on rabbitMQ.
RabbitMQConnection rabbitMQ = serviceProvider.GetService<RabbitMQConnection>();
RabbitMQConnection rabbitMQ = _serviceProvider.GetService<RabbitMQConnection>();

// Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed

ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY);
ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY);
ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY);
ushort concurrent_describe_images = 1;
ushort concurrent_describe_videos = 1;

ushort concurrent_describe_images = NO_CONCURRENCY;
ushort concurrent_describe_videos = NO_CONCURRENCY;

// Create and start consuming from all queues. If concurrency >=1 the queues are purged


// Upstream Sync related
_logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} ");
serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);

// Transcription Related
_logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} ");

serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);
_serviceProvider.GetService<TranscriptionTask>().Consume(concurrent_transcriptions);

// no more! - serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);
// no more! - _serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);

// Video Processing Related
_logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} ");
serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
_serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
// Descriptions
serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);


_serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
_serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);

// SceneDetection now handled by native Python
// See https://github.com/classtranscribe/pyapi
serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);

// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);

// Elastic Search index should be built after TranscriptionTask
serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);

// Outdated Elastic Search index would be removed
serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);

serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);

_logger.LogInformation("Done creating task consumers");
//nolonger used :
// nope serviceProvider.GetService<nope ConvertVideoToWavTask>().Consume(concurrent_videotasks);

bool hacktest = false;
if (hacktest)
{
TempCode tempCode = serviceProvider.GetService<TempCode>();
tempCode.Temp();
return;
}
_logger.LogInformation("All done!");

QueueAwakerTask queueAwakerTask = serviceProvider.GetService<QueueAwakerTask>();

int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));

_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);

var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);

Thread.Sleep(initialPauseInterval);

// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
Thread.Sleep(timeInterval);
};
_serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);
_logger.LogInformation("createTaskQueues() - Done creating task consumers");
}

// Catch all unhandled exceptions.
static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
_logger.LogError(e, "Unhandled Exception Caught");
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}

private static ushort ToUInt16(String val, ushort defaultVal)
Expand Down
Loading

0 comments on commit 09bd97c

Please sign in to comment.