diff --git a/ClassTranscribeDatabase/Globals.cs b/ClassTranscribeDatabase/Globals.cs index 3bdbcca..46f6c80 100644 --- a/ClassTranscribeDatabase/Globals.cs +++ b/ClassTranscribeDatabase/Globals.cs @@ -26,6 +26,10 @@ public class AppSettings public string RABBITMQ_PORT { get; set; } = "5672"; + public string RABBITMQ_AUTOMATIC_RECOVERY = "true"; + public string RABBITMQ_HEARTBEAT_SECONDS = "60"; + + // RABBITMQ_PREFETCH_COUNT has been replaced with these CONCURRENT LIMITS- //no longer used g RABBITMQ_PREFETCH_COUNT { get; set; } // No longer used; can be deleted in next cleanup public string MAX_CONCURRENT_TRANSCRIPTIONS { get; set; } diff --git a/ClassTranscribeDatabase/Seed.cs b/ClassTranscribeDatabase/Seed.cs index 24129b0..9015c0d 100644 --- a/ClassTranscribeDatabase/Seed.cs +++ b/ClassTranscribeDatabase/Seed.cs @@ -1,12 +1,14 @@ -using ClassTranscribeDatabase.Models; -using Microsoft.AspNetCore.Identity; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging; -using System; +using System; using System.Collections.Generic; using System.IO; using System.Linq; -using System.Threading; +using System.Threading.Tasks; + +using Microsoft.AspNetCore.Identity; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +using ClassTranscribeDatabase.Models; namespace ClassTranscribeDatabase { @@ -58,7 +60,10 @@ public void Seed() if (attempt < maxAttempts) { _logger.LogInformation($"Sleeping for {retrySeconds} seconds"); - Thread.Sleep(1000 * retrySeconds); + // Thread.Sleep(1000 * retrySeconds); + + Task.Delay(TimeSpan.FromSeconds(retrySeconds)).Wait(); + } else { diff --git a/ClassTranscribeDatabase/Services/RabbitMQConnection.cs b/ClassTranscribeDatabase/Services/RabbitMQConnection.cs index 3c5a361..6787e22 100644 --- a/ClassTranscribeDatabase/Services/RabbitMQConnection.cs +++ b/ClassTranscribeDatabase/Services/RabbitMQConnection.cs @@ -62,15 +62,18 @@ private void CreateSharedConnection() ConnectionRefCount++; return; } - Logger.LogInformation("Creating RabbitMQ connection"); + var recovery = Convert.ToBoolean(Globals.appSettings.RABBITMQ_AUTOMATIC_RECOVERY); + var heartbeat = Convert.ToUInt32(Globals.appSettings.RABBITMQ_HEARTBEAT_SECONDS); + Logger.LogInformation($"Creating RabbitMQ connection recovery:{recovery} heartbeat:{heartbeat}"); var factory = new ConnectionFactory() { HostName = Globals.appSettings.RABBITMQ_SERVER_NAME.Length > 0 ? Globals.appSettings.RABBITMQ_SERVER_NAME : Globals.appSettings.RabbitMQServer, UserName = Globals.appSettings.ADMIN_USER_ID, Password = Globals.appSettings.ADMIN_PASSWORD, - Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT) // 5672 - + Port = Convert.ToUInt16(Globals.appSettings.RABBITMQ_PORT), // 5672 + AutomaticRecoveryEnabled = recovery, + RequestedHeartbeat = TimeSpan.FromSeconds(heartbeat) }; // A developer may still want to checkout old code which uses the old env branch // so just complain loudly for now diff --git a/ClassTranscribeServer/Controllers/CaptionsController.cs b/ClassTranscribeServer/Controllers/CaptionsController.cs index 499e17b..35d4af1 100644 --- a/ClassTranscribeServer/Controllers/CaptionsController.cs +++ b/ClassTranscribeServer/Controllers/CaptionsController.cs @@ -108,7 +108,7 @@ public async Task> PostCaption(Caption modifiedCaption) }; _context.Captions.Add(newCaption); await _context.SaveChangesAsync(); - _wakeDownloader.UpdateVTTFile(oldCaption.TranscriptionId); + // nope _wakeDownloader.UpdateVTTFile(oldCaption.TranscriptionId); return newCaption; } @@ -337,7 +337,7 @@ public async Task>> PostCaptionFile(IFormFile await _context.Captions.AddRangeAsync(captions); await _context.SaveChangesAsync(); - _wakeDownloader.UpdateVTTFile(transcription.Id); + //nope _wakeDownloader.UpdateVTTFile(transcription.Id); return captions; } diff --git a/ClassTranscribeServer/Utils/WakeDownloader.cs b/ClassTranscribeServer/Utils/WakeDownloader.cs index 9cff435..b2c939a 100644 --- a/ClassTranscribeServer/Utils/WakeDownloader.cs +++ b/ClassTranscribeServer/Utils/WakeDownloader.cs @@ -31,15 +31,15 @@ public virtual void UpdatePlaylist(string playlistId) Wake(msg); } - public virtual void UpdateVTTFile(string transcriptionId) - { - JObject msg = new JObject - { - { "Type", TaskType.GenerateVTTFile.ToString() }, - { "TranscriptionId", transcriptionId } - }; - Wake(msg); - } + // public virtual void UpdateVTTFile(string transcriptionId) + // { + // JObject msg = new JObject + // { + // { "Type", TaskType.GenerateVTTFile.ToString() }, + // { "TranscriptionId", transcriptionId } + // }; + // Wake(msg); + // } public void UpdateOffering(string offeringId) { diff --git a/PythonRpcServer/kaltura.py b/PythonRpcServer/kaltura.py index 41528b9..4120461 100644 --- a/PythonRpcServer/kaltura.py +++ b/PythonRpcServer/kaltura.py @@ -16,7 +16,7 @@ KALTURA_PARTNER_ID = int(os.getenv('KALTURA_PARTNER_ID', default=0)) KALTURA_TOKEN_ID = os.getenv('KALTURA_TOKEN_ID', default=None) KATLURA_APP_TOKEN = os.getenv('KALTURA_APP_TOKEN', default=None) - +KALTURA_TIMEOUT= int( os.getenv('KALTURA_TIMEOUT', default=30)) # Examples of Playlists URLs the user is likely to see- # Playlist 1_eilnj5er is Angrave's short set of example vidos @@ -181,7 +181,7 @@ def getMediaInfosForKalturaChannel(self, partnerInfo, channelId): return self.getSensibleMediaInfos(res) def downloadLecture(self, url): - filePath, extension = download_file(url) + filePath, extension = download_file(url, timeout=KALTURA_TIMEOUT) return filePath, extension #Exxpects diff --git a/PythonRpcServer/utils.py b/PythonRpcServer/utils.py index 1cc7996..39b02a7 100644 --- a/PythonRpcServer/utils.py +++ b/PythonRpcServer/utils.py @@ -87,12 +87,12 @@ def extension_from_magic_bytes(filepath): # Filepath and cookies may be specified # Returns a two tuple, [filepath, extension] # An appropriate Extension is guessed based on the mimetype in the 'content-type' response header -def download_file(url, filepath=None, cookies=None): +def download_file(url, filepath=None, cookies=None, timeout=60): # NOTE the stream=True parameter below if not filepath: filepath = getTmpFile() extension = None - with requests.get(url, stream=True, allow_redirects=True, cookies=cookies) as r: + with requests.get(url, stream=True, allow_redirects=True, cookies=cookies, timeout=timeout) as r: r.raise_for_status() diff --git a/TaskEngine/Program.cs b/TaskEngine/Program.cs index 4c85cff..a8e1c40 100644 --- a/TaskEngine/Program.cs +++ b/TaskEngine/Program.cs @@ -1,16 +1,20 @@ -using ClassTranscribeDatabase; -using ClassTranscribeDatabase.Services; -using ClassTranscribeDatabase.Services.MSTranscription; +using System; +using System.Threading; +using System.Threading.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System; -using TaskEngine.Tasks; -using System.Threading; + using Newtonsoft.Json.Linq; + using static ClassTranscribeDatabase.CommonUtils; +using ClassTranscribeDatabase; +using ClassTranscribeDatabase.Services; +using ClassTranscribeDatabase.Services.MSTranscription; + +using TaskEngine.Tasks; namespace TaskEngine { @@ -29,14 +33,32 @@ class Program public static ServiceProvider _serviceProvider; public static ILogger _logger; public static void Main() + { + Console.WriteLine("TaskEngine.Main starting up -GetConfigurations..."); + try { + SetupServices(); // should never return + createTaskQueues(); + runQueueAwakerForever(); + + } catch (Exception e) { + // Some paranoia here; we *should* have a logger and exception handler in place + // So this is only here to catch unexpected startup errors that otherwise might be silent + Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n"); + if(_logger !=null){ + _logger.LogError(e, "Unhandled Exception Caught"); + } + } + } + public static void SetupServices() { var configuration = CTDbContext.GetConfigurations(); // This project relies on Dependency Injection to configure its various services, // For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1 // All the services used are configured using the service provider. + Console.WriteLine("SetupServices() - starting"); - var serviceProvider = new ServiceCollection() + _serviceProvider = new ServiceCollection() .AddLogging(builder => { builder.AddConsole(); @@ -80,10 +102,9 @@ public static void Main() .AddSingleton() .BuildServiceProvider(); - _serviceProvider = serviceProvider; - _logger = serviceProvider.GetRequiredService>(); + _logger = _serviceProvider.GetRequiredService>(); - Globals.appSettings = serviceProvider.GetService>().Value; + Globals.appSettings = _serviceProvider.GetService>().Value; TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings); AppDomain currentDomain = AppDomain.CurrentDomain; @@ -92,112 +113,108 @@ public static void Main() _logger.LogInformation("Seeding database"); // Seed the database, with some initial data. - Seeder seeder = serviceProvider.GetService(); + Seeder seeder = _serviceProvider.GetService(); seeder.Seed(); + } - _logger.LogInformation("Starting TaskEngine"); + + static void runQueueAwakerForever() { + _logger.LogInformation("runQueueAwakerForever - start"); + QueueAwakerTask queueAwakerTask = _serviceProvider.GetService(); + int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES)); + int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES)); + + _logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck); + var timeInterval = new TimeSpan(0, periodicCheck, 0); + + var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0); + _logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval); + // Thread.Sleep(initialPauseInterval); + Task.Delay(initialPauseInterval).Wait(); + // Check for new tasks every "timeInterval". + // The periodic check will discover all undone tasks + // TODO/REVIEW: However some tasks also publish the next items + while (true) + { + try { + _logger.LogInformation("Periodic Check"); + queueAwakerTask.Publish(new JObject + { + { "Type", TaskType.PeriodicCheck.ToString() } + }); + } catch (Exception e) { + _logger.LogError(e, "Error in Periodic Check"); + } + // Thread.Sleep(timeInterval); + Task.Delay(timeInterval).Wait(); + _logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck); + }; + } + static void createTaskQueues() { + _logger.LogInformation("createTaskQueues() -starting"); // Delete any pre-existing queues on rabbitMQ. - RabbitMQConnection rabbitMQ = serviceProvider.GetService(); + RabbitMQConnection rabbitMQ = _serviceProvider.GetService(); // Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY); ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY); ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY); - ushort concurrent_describe_images = 1; - ushort concurrent_describe_videos = 1; - + ushort concurrent_describe_images = NO_CONCURRENCY; + ushort concurrent_describe_videos = NO_CONCURRENCY; // Create and start consuming from all queues. If concurrency >=1 the queues are purged - // Upstream Sync related _logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} "); - serviceProvider.GetService().Consume(concurrent_synctasks); - serviceProvider.GetService().Consume(concurrent_synctasks); + _serviceProvider.GetService().Consume(concurrent_synctasks); + _serviceProvider.GetService().Consume(concurrent_synctasks); // Transcription Related _logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} "); - serviceProvider.GetService().Consume(concurrent_transcriptions); + _serviceProvider.GetService().Consume(concurrent_transcriptions); - // no more! - serviceProvider.GetService().Consume(concurrent_transcriptions); + // no more! - _serviceProvider.GetService().Consume(concurrent_transcriptions); // Video Processing Related _logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} "); - serviceProvider.GetService().Consume(concurrent_videotasks); + _serviceProvider.GetService().Consume(concurrent_videotasks); // Descriptions - serviceProvider.GetService().Consume(concurrent_describe_videos); - serviceProvider.GetService().Consume(concurrent_describe_images); - - + _serviceProvider.GetService().Consume(concurrent_describe_videos); + _serviceProvider.GetService().Consume(concurrent_describe_images); // SceneDetection now handled by native Python // See https://github.com/classtranscribe/pyapi - serviceProvider.GetService().Consume(DISABLED_TASK); + _serviceProvider.GetService().Consume(DISABLED_TASK); // We dont want concurrency for these tasks _logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers."); - serviceProvider.GetService().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY? - // does nothing at the moment serviceProvider.GetService().Consume(NO_CONCURRENCY); - serviceProvider.GetService().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode); + _serviceProvider.GetService().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY? + // does nothing at the moment _serviceProvider.GetService().Consume(NO_CONCURRENCY); + _serviceProvider.GetService().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode); // Elastic Search index should be built after TranscriptionTask - serviceProvider.GetService().Consume(NO_CONCURRENCY); + _serviceProvider.GetService().Consume(NO_CONCURRENCY); // Outdated Elastic Search index would be removed - serviceProvider.GetService().Consume(NO_CONCURRENCY); + _serviceProvider.GetService().Consume(NO_CONCURRENCY); - serviceProvider.GetService().Consume(NO_CONCURRENCY); + _serviceProvider.GetService().Consume(NO_CONCURRENCY); - serviceProvider.GetService().Consume(DISABLED_TASK); - - _logger.LogInformation("Done creating task consumers"); - //nolonger used : - // nope serviceProvider.GetService().Consume(concurrent_videotasks); - - bool hacktest = false; - if (hacktest) - { - TempCode tempCode = serviceProvider.GetService(); - tempCode.Temp(); - return; - } - _logger.LogInformation("All done!"); - - QueueAwakerTask queueAwakerTask = serviceProvider.GetService(); - - int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES)); - int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES)); - - _logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck); - var timeInterval = new TimeSpan(0, periodicCheck, 0); - - var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0); - _logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval); - - Thread.Sleep(initialPauseInterval); - - // Check for new tasks every "timeInterval". - // The periodic check will discover all undone tasks - // TODO/REVIEW: However some tasks also publish the next items - while (true) - { - queueAwakerTask.Publish(new JObject - { - { "Type", TaskType.PeriodicCheck.ToString() } - }); - Thread.Sleep(timeInterval); - }; + _serviceProvider.GetService().Consume(DISABLED_TASK); + _logger.LogInformation("createTaskQueues() - Done creating task consumers"); } - // Catch all unhandled exceptions. static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args) { Exception e = (Exception)args.ExceptionObject; - _logger.LogError(e, "Unhandled Exception Caught"); + Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}"); + if(_logger !=null){ + _logger.LogError(e, "Unhandled Exception Caught"); + } } private static ushort ToUInt16(String val, ushort defaultVal) diff --git a/TaskEngine/Tasks/DownloadPlaylistInfoTask.cs b/TaskEngine/Tasks/DownloadPlaylistInfoTask.cs index 932b3f3..a85ae98 100644 --- a/TaskEngine/Tasks/DownloadPlaylistInfoTask.cs +++ b/TaskEngine/Tasks/DownloadPlaylistInfoTask.cs @@ -66,7 +66,7 @@ protected override async Task OnConsume(string playlistId, TaskParameters taskPa } catch(Exception) { // ignored (e.g. no media). Tried DefaultIfEmpty but that threw an Entity Framework runtime error; hence this slightly clunky exception implementation } - GetLogger().LogInformation($"Playlist {playlistId}: Starting index = {index}"); + GetLogger().LogInformation($"Playlist {playlistId}: Starting index={index}, SourceType={playlist.SourceType}"); List medias = new List(); switch (playlist.SourceType) { @@ -105,6 +105,7 @@ public async Task> GetKalturaPlaylist(Playlist playlist, int index, CTGrpc.JsonString jsonString = null; try { + GetLogger().LogError($"playlist=({playlist.Id}):GetKalturaChannelEntriesRPCAsync({playlist.PlaylistIdentifier}) - rpc starting"); jsonString = await _rpcClient.PythonServerClient.GetKalturaChannelEntriesRPCAsync(new CTGrpc.PlaylistRequest { Url = playlist.PlaylistIdentifier @@ -121,6 +122,8 @@ public async Task> GetKalturaPlaylist(Playlist playlist, int index, GetLogger().LogError($"playlist=({playlist.Id}):{e.Message}"); } return newMedia; + } finally { + GetLogger().LogError($"playlist=({playlist.Id}):GetKalturaChannelEntriesRPCAsync({playlist.PlaylistIdentifier}) - rpc complete"); } JArray jArray = JArray.Parse(jsonString.Json); diff --git a/UnitTests/GlobalFixture.cs b/UnitTests/GlobalFixture.cs index ded57e4..1376720 100644 --- a/UnitTests/GlobalFixture.cs +++ b/UnitTests/GlobalFixture.cs @@ -37,7 +37,7 @@ public GlobalFixture() Directory.CreateDirectory(_testDataDirectory); var mockWake = new Mock(MockBehavior.Strict, null); - mockWake.Setup(wake => wake.UpdateVTTFile(It.IsAny())); + // mockWake.Setup(wake => wake.UpdateVTTFile(It.IsAny())); mockWake.Setup(wake => wake.UpdatePlaylist(It.IsAny())); mockWake.Setup(wake => wake.SceneDetection(It.IsAny(), It.IsAny()));