Skip to content

Commit

Permalink
Updates and Testing
Browse files Browse the repository at this point in the history
  • Loading branch information
angrave committed Oct 16, 2024
1 parent 12ee8f2 commit 5cdc50e
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 32 deletions.
6 changes: 4 additions & 2 deletions ClassTranscribeDatabase/CommonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum TaskType
DownloadPlaylistInfo = 3,
DownloadMedia = 4,
ConvertMedia = 5,
TranscribeVideo = 6,
// TranscribeVideo = 6,
ProcessVideo = 7,
Aggregator = 8,
GenerateVTTFile = 9,
Expand All @@ -39,7 +39,9 @@ public enum TaskType
PythonCrawler = 19,

DescribeVideo = 20,
DescribeImage = 21
DescribeImage = 21,
AzureTranscribeVideo = 22,
LocalTranscribeVideo = 23

}

Expand Down
4 changes: 2 additions & 2 deletions ClassTranscribeServer/Controllers/PlaylistsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public async Task<ActionResult<IEnumerable<PlaylistDTO>>> GetPlaylists2(string o
JsonMetadata = m.JsonMetadata,
CreatedAt = m.CreatedAt,
SceneDetectReady = m.Video.HasSceneObjectData(),
Ready = m.Video != null && "NoError" == m.Video.TranscriptionStatus ,
Ready = m.Video != null && Video.TranscriptionStatusMessages.NOERROR == m.Video.TranscriptionStatus ,
SourceType = m.SourceType,
Duration = m.Video?.Duration,
PublishStatus = m.PublishStatus,
Expand Down Expand Up @@ -265,7 +265,7 @@ public async Task<ActionResult<PlaylistDTO>> GetPlaylist(string id)
PublishStatus = m.PublishStatus,
Options = m.GetOptionsAsJson(),
SceneDetectReady = m.Video != null && m.Video.HasSceneObjectData(),
Ready = m.Video != null && "NoError" == m.Video.TranscriptionStatus ,
Ready = m.Video != null && Video.TranscriptionStatusMessages.NOERROR == m.Video.TranscriptionStatus ,
Video = m.Video == null ? null : new VideoDTO
{
Id = m.Video.Id,
Expand Down
2 changes: 1 addition & 1 deletion ClassTranscribeServer/Utils/WakeDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void TranscribeVideo(string videoOrMediaId, bool deleteExisting)
{
JObject msg = new JObject
{
{ "Type", TaskType.TranscribeVideo.ToString() },
{ "Type", TaskType.LocalTranscribeVideo.ToString() },
{ "videoOrMediaId", videoOrMediaId },
{ "DeleteExisting", deleteExisting }
};
Expand Down
2 changes: 1 addition & 1 deletion PythonRpcServer/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def serve():
# Until we can ensure no timeouts on remote services, the default here is set to a conservative low number
# This is to ensure we can still make progress even if every python tasks tries to use all cpu cores.
max_workers=int(os.getenv('NUM_PYTHON_WORKERS', 3))
print(f"max_workers={max_workers}")
print(f"max_workers={max_workers}. Starting up grpc server...")

server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))

Expand Down
37 changes: 19 additions & 18 deletions TaskEngine/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,30 +139,31 @@ static void runQueueAwakerForever() {
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
// try {
// _logger.LogInformation("Periodic Check");
// queueAwakerTask.Publish(new JObject
// {
// { "Type", TaskType.PeriodicCheck.ToString() }
// });
// } catch (Exception e) {
// _logger.LogError(e, "Error in Periodic Check");
// }

try {
var videoId = "ddceb720-a9d6-417d-b5ea-e94c6c0a86c6";
_logger.LogInformation("Transcription Task Initiated");
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.TranscribeVideo.ToString() },
{ "videoOrMediaId", videoId }
{ "Type", TaskType.PeriodicCheck.ToString() }
});

_logger.LogInformation("Transcription Task Published Successfully");
} catch (Exception e) {
_logger.LogError(e, "Error in Transcription Task");
_logger.LogError(e, "Error in Periodic Check");
}

// Hacky testing...
// try {
// var videoId = "ddceb720-a9d6-417d-b5ea-e94c6c0a86c6";
// _logger.LogInformation("Transcription Task Initiated");
// queueAwakerTask.Publish(new JObject
// {
// { "Type", TaskType.LocalTranscribeVideo.ToString() },
// { "videoOrMediaId", videoId }
// });

// _logger.LogInformation("Transcription Task Published Successfully");
// } catch (Exception e) {
// _logger.LogError(e, "Error in Transcription Task");
// }


// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
Expand Down Expand Up @@ -208,7 +209,7 @@ static void createTaskQueues() {
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);

// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers.");
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers!");
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
Expand Down
2 changes: 1 addition & 1 deletion TaskEngine/Tasks/AzureTranscriptionTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AzureTranscriptionTask : RabbitMQTask<string>
public AzureTranscriptionTask(RabbitMQConnection rabbitMQ, MSTranscriptionService msTranscriptionService,
// GenerateVTTFileTask generateVTTFileTask,
ILogger<AzureTranscriptionTask> logger, CaptionQueries captionQueries)
: base(rabbitMQ, TaskType.TranscribeVideo, logger)
: base(rabbitMQ, TaskType.AzureTranscribeVideo, logger)
{
_msTranscriptionService = msTranscriptionService;
// nope _generateVTTFileTask = generateVTTFileTask;
Expand Down
11 changes: 6 additions & 5 deletions TaskEngine/Tasks/LocalTranscriptionTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public LocalTranscriptionTask(RabbitMQConnection rabbitMQ,
RpcClient rpcClient,
// GenerateVTTFileTask generateVTTFileTask,
ILogger<LocalTranscriptionTask> logger, CaptionQueries captionQueries)
: base(rabbitMQ, TaskType.TranscribeVideo, logger)
: base(rabbitMQ, TaskType.LocalTranscribeVideo, logger)
{
_rpcClient = rpcClient;
_captionQueries = captionQueries;
Expand Down Expand Up @@ -90,16 +90,17 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam
GetLogger().LogInformation($"{videoId}: Updated TranscribingAttempts = {video.TranscribingAttempts}");
try
{
var mockWhisperResult = Globals.appSettings.MOCK_RECOGNITION == "MOCK";

GetLogger().LogInformation($"{videoId}: Calling RecognitionWithVideoStreamAsync");
GetLogger().LogInformation($"{videoId}: Calling RecognitionWithVideoStreamAsync( mock={mockWhisperResult})");

var request = new CTGrpc.TranscriptionRequest
{
LogId = videoId,
FilePath = video.Video1.VMPath,
Model = "en",
Language = "en",
Testing = true
Testing = mockWhisperResult
// PhraseHints = phraseHints,
// CourseHints = "",
// OutputLanguages = "en"
Expand Down Expand Up @@ -161,7 +162,7 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam
{
TranscriptionType = TranscriptionType.Caption,
Captions = theCaptions,
Language = theLanguage,
Language = "en-US" , /* Must be en-US for FrontEnd; Cant be just "en" */
VideoId = video.Id,
Label = $"{theLanguage} (ClassTranscribe)",
SourceInternalRef = SOURCEINTERNALREF, //
Expand All @@ -177,7 +178,7 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam
}


video.TranscriptionStatus = "NoError";
video.TranscriptionStatus = Video.TranscriptionStatusMessages.NOERROR;
// video.JsonMetadata["LastSuccessfulTime"] = result.LastSuccessTime.ToString();

GetLogger().LogInformation($"{videoId}: Saving captions");
Expand Down
2 changes: 1 addition & 1 deletion TaskEngine/Tasks/QueueAwakerTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ protected async override Task OnConsume(JObject jObject, TaskParameters taskPara
var sourceId = jObject["SourceId"].ToString();
_pythonCrawlerTask.Publish(sourceId);
}
else if (type == TaskType.TranscribeVideo.ToString())
else if (type == TaskType.LocalTranscribeVideo.ToString())
{
var id = jObject["videoOrMediaId"].ToString();

Expand Down
2 changes: 1 addition & 1 deletion pythonrpcserver.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
RUN python -m grpc_tools.protoc -I . --python_out=./ --grpc_python_out=./ ct.proto

COPY ./PythonRpcServer .

# The output of this file is used when we set MOCK_RECOGNITION=MOCK for quick testing
RUN whisper -ojf -f transcribe_hellohellohello.wav

CMD [ "nice", "-n", "18", "ionice", "-c", "2", "-n", "6", "python3", "-u", "/PythonRpcServer/server.py" ]
Expand Down

0 comments on commit 5cdc50e

Please sign in to comment.