Skip to content

Commit

Permalink
BCS application launcher Config (#49)
Browse files Browse the repository at this point in the history
* Add Config structures

* gRPC client and service - configuration structure serialization and deserialization

this commit includes the essential functions to
convert the NMOS configuration structures to a linear
vector. preparing to transmit it over protobuf protocol.
And includes the essential functions to convert
linear vector NMOS configuration to structured configurations.

Signed-off-by: Aly, Walid <[email protected]>

* Add ffmpeg pipeline generator files

Signed-off-by: Tomasz Szumski <[email protected]>

* update gRPC service

update gRPC service

Signed-off-by: Aly, Walid <[email protected]>

* Add serialization and deserialization functions for Config using nlohmann/json

* Send json as string via gRPC

Signed-off-by: Tomasz Szumski <[email protected]>

* Add commitConfigs function declaration to FFmpeg_wrapper_client.h and include onfig_params.hpp header

* Append static to not to expose not needed symbols, fallback to previous serialization method is json fails

---------

Signed-off-by: Aly, Walid <[email protected]>
Signed-off-by: Tomasz Szumski <[email protected]>
Co-authored-by: Aly, Walid <[email protected]>
  • Loading branch information
tszumski and walidbarakat authored Jan 24, 2025
1 parent 56d55c7 commit e4791a0
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 7 deletions.
12 changes: 9 additions & 3 deletions gRPC/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ add_custom_command(
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")

include(FetchContent)
FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz)
FetchContent_MakeAvailable(json)

# hw_grpc_proto
add_library(hw_grpc_proto
${hw_grpc_srcs}
Expand All @@ -74,7 +78,8 @@ target_link_libraries(FFmpeg_wrapper_client
absl::check
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
${_PROTOBUF_LIBPROTOBUF}
nlohmann_json::nlohmann_json)

add_executable(cmd_pass_client cmd_pass_client.cc)
target_link_libraries(cmd_pass_client
Expand All @@ -88,7 +93,7 @@ add_executable(cmd_pass_client cmd_pass_client.cc)
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})

add_executable(FFmpeg_wrapper_service FFmpeg_wrapper_service.cc CmdPassImpl.cc)
add_executable(FFmpeg_wrapper_service FFmpeg_wrapper_service.cc CmdPassImpl.cc ffmpeg_pipeline_generator.cpp)
target_link_libraries(FFmpeg_wrapper_service
hw_grpc_proto
absl::check
Expand All @@ -97,4 +102,5 @@ add_executable(FFmpeg_wrapper_service FFmpeg_wrapper_service.cc CmdPassImpl.cc)
absl::log
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
${_PROTOBUF_LIBPROTOBUF}
nlohmann_json::nlohmann_json)
165 changes: 161 additions & 4 deletions gRPC/CmdPassImpl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,150 @@

#include "ffmpeg_pipeline_generator.hpp"
#include "config_serialize_deserialize.hpp"
#include <sstream>
#include "CmdPassImpl.h"

// Helper function to convert string to any type
template <typename T>
T from_string(const std::string& str) {
std::istringstream iss(str);
T value;
iss >> value;
return value;
}

// Function to convert vector of string pairs to FrameRate
static FrameRate stringPairsToFrameRate(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
FrameRate frameRate;
frameRate.numerator = from_string<int>(pairs.at(prefix + "frame_rate_numerator"));
frameRate.denominator = from_string<int>(pairs.at(prefix + "frame_rate_denominator"));
return frameRate;
}

// Function to convert vector of string pairs to Video
static Video stringPairsToVideo(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
Video video;
video.frame_width = from_string<int>(pairs.at(prefix + "frame_width"));
video.frame_height = from_string<int>(pairs.at(prefix + "frame_height"));
video.pixel_format = pairs.at(prefix + "pixel_format");
video.video_type = pairs.at(prefix + "video_type");
video.frame_rate = stringPairsToFrameRate(pairs, prefix);
return video;
}

// Function to convert vector of string pairs to Audio
static Audio stringPairsToAudio(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
Audio audio;
audio.channels = from_string<int>(pairs.at(prefix + "channels"));
audio.sample_rate = from_string<int>(pairs.at(prefix + "sample_rate"));
audio.format = pairs.at(prefix + "format");
audio.packet_time = pairs.at(prefix + "packet_time");
return audio;
}

// Function to convert vector of string pairs to File
static File stringPairsToFile(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
File file;
file.path = pairs.at(prefix + "file_path");
file.filename = pairs.at(prefix + "file_filename");
return file;
}

// Function to convert vector of string pairs to ST2110
static ST2110 stringPairsToST2110(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
ST2110 st2110;
st2110.network_interface = pairs.at(prefix + "network_interface");
st2110.local_ip = pairs.at(prefix + "local_ip");
st2110.remote_ip = pairs.at(prefix + "remote_ip");
st2110.transport = pairs.at(prefix + "transport");
st2110.remote_port = from_string<int>(pairs.at(prefix + "remote_port"));
st2110.payload_type = from_string<int>(pairs.at(prefix + "st_payload_type"));
return st2110;
}

// Function to convert vector of string pairs to MCM
static MCM stringPairsToMCM(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
MCM mcm;
mcm.conn_type = pairs.at(prefix + "conn_type");
mcm.transport = pairs.at(prefix + "transport");
mcm.transport_pixel_format = pairs.at(prefix + "transport_pixel_format");
mcm.ip = pairs.at(prefix + "ip");
mcm.port = from_string<int>(pairs.at(prefix + "port"));
mcm.urn = pairs.at(prefix + "urn");
return mcm;
}

// Function to convert vector of string pairs to Payload
static Payload stringPairsToPayload(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
Payload payload;
payload.type = static_cast<payload_type>(from_string<int>(pairs.at(prefix + "payload_type")));
if (payload.type == video) {
payload.video = stringPairsToVideo(pairs, prefix);
} else if (payload.type == audio) {
payload.audio = stringPairsToAudio(pairs, prefix);
}
return payload;
}

// Function to convert vector of string pairs to StreamType
static StreamType stringPairsToStreamType(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
StreamType streamType;
streamType.type = static_cast<stream_type>(from_string<int>(pairs.at(prefix + "stream_type")));
if (streamType.type == file) {
streamType.file = stringPairsToFile(pairs, prefix);
} else if (streamType.type == st2110) {
streamType.st2110 = stringPairsToST2110(pairs, prefix);
} else if (streamType.type == mcm) {
streamType.mcm = stringPairsToMCM(pairs, prefix);
}
return streamType;
}

// Function to convert vector of string pairs to Stream
static Stream stringPairsToStream(const std::unordered_map<std::string, std::string>& pairs, const std::string& prefix) {
Stream stream;
stream.payload = stringPairsToPayload(pairs, prefix);
stream.stream_type = stringPairsToStreamType(pairs, prefix);
return stream;
}

// Function to convert vector of string pairs to Config
static Config stringPairsToConfig(const std::vector<std::pair<std::string, std::string>>& pairs) {
Config config;

if (pairs.front().first == "json") {
if (deserialize_config_json(config, pairs.front().second) != 0) {
std::cout << "Error deserializing Config from json, trying previous solution" << std::endl;
}
else {
return config;
}
}

std::unordered_map<std::string, std::string> pairs_map(pairs.begin(), pairs.end());
config.function = pairs_map.at("function");
config.gpu_hw_acceleration = pairs_map.at("gpu_hw_acceleration");
config.logging_level = from_string<int>(pairs_map.at("logging_level"));

// Extract senders and receivers
size_t sender_index = 0;
size_t receiver_index = 0;
while (true) {
std::string sender_prefix = "sender_" + std::to_string(sender_index) + "_";
std::string receiver_prefix = "receiver_" + std::to_string(receiver_index) + "_";
if (pairs_map.find(sender_prefix + "payload_type") != pairs_map.end()) {
config.senders.push_back(stringPairsToStream(pairs_map, sender_prefix));
++sender_index;
} else if (pairs_map.find(receiver_prefix + "payload_type") != pairs_map.end()) {
config.receivers.push_back(stringPairsToStream(pairs_map, receiver_prefix));
++receiver_index;
} else {
break;
}
}
return config;
}

void CmdPassImpl::Run(std::string server_address) {
ServerBuilder builder;
stop = false;
Expand Down Expand Up @@ -47,7 +192,8 @@ void CmdPassImpl::CallData::Proceed() {
new CallData(service_, cq_);

std::stringstream ss;
ss << "ffmpeg";
std::string pipelinie_string;
std::vector<std::pair<std::string, std::string>> committed_config;

if (request_.obj().empty()) {
responder_.Finish(response_,
Expand All @@ -61,15 +207,26 @@ void CmdPassImpl::CallData::Proceed() {
}

for (const auto &cmd : request_.obj()) {
ss << " -" << cmd.cmd_key() << " " << cmd.cmd_val();
committed_config.push_back(std::make_pair(cmd.cmd_key(), cmd.cmd_val()));
}

std::string ffmpeg_full_cmd = ss.str();
Config recieved_config = stringPairsToConfig(committed_config);

if (ffmpeg_generate_pipeline(recieved_config, pipelinie_string) != 0) {
pipelinie_string.clear();
std::cout << "Error generating pipeline" << std::endl; //TODO : need to return as response error code
//return 1;
}

ss << "ffmpeg ";
ss << pipelinie_string;

pipelinie_string = ss.str();

std::array<char, 128> buffer;
std::string result;

FILE *pipe = popen(ffmpeg_full_cmd.c_str(), "r");
FILE *pipe = popen(pipelinie_string.c_str(), "r");
if (!pipe) { /* FFmpeg pipeline/execution failed i.e memory allocation */
responder_.Finish(response_,
Status(grpc::INTERNAL, FFMPEG_APP_EXEC_FAIL_STATUS,
Expand Down
113 changes: 113 additions & 0 deletions gRPC/FFmpeg_wrapper_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
* SPDX-License-Identifier: BSD-3-Clause
*/

#include "config_params.hpp"
#include "FFmpeg_wrapper_client.h"
#include "build/ffmpeg_cmd_wrap.pb.h"
#include "config_serialize_deserialize.hpp"
#include <sstream>
#include <utility>
#include <string>
#include <iostream>

CmdPassClient::CmdPassClient(std::string interface, std::string port) : pending_requests_(0) {
std::cout << "------ [start] initiate channel --------" << std::endl;
Expand Down Expand Up @@ -90,3 +93,113 @@ void CmdPassClient::AsyncCompleteRpc() {
void CmdPassClient::WaitForAllRequests() {
all_tasks_completed.wait(false);
}

// Helper function to convert any type to string
template <typename T>
std::string to_string(const T& value) {
std::ostringstream oss;
oss << value;
return oss.str();
}

// Function to convert FrameRate to vector of string pairs
static void frameRateToStringPairs(const FrameRate& frameRate, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "frame_rate_numerator", to_string<int>(frameRate.numerator)});
result.push_back({prefix + "frame_rate_denominator", to_string<int>(frameRate.denominator)});
}

// Function to convert Video to vector of string pairs
static void videoToStringPairs(const Video& video, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "frame_width", to_string<int>(video.frame_width)});
result.push_back({prefix + "frame_height", to_string<int>(video.frame_height)});
result.push_back({prefix + "pixel_format", video.pixel_format});
result.push_back({prefix + "video_type", video.video_type});
frameRateToStringPairs(video.frame_rate, result, prefix);
}

// Function to convert Audio to vector of string pairs
static void audioToStringPairs(const Audio& audio, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "channels", to_string<int>(audio.channels)});
result.push_back({prefix + "sample_rate", to_string<int>(audio.sample_rate)});
result.push_back({prefix + "format", audio.format});
result.push_back({prefix + "packet_time", audio.packet_time});
}

// Function to convert File to vector of string pairs
static void fileToStringPairs(const File& file, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "file_path", file.path});
result.push_back({prefix + "file_filename", file.filename});
}

// Function to convert ST2110 to vector of string pairs
static void st2110ToStringPairs(const ST2110& st2110, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "network_interface", st2110.network_interface});
result.push_back({prefix + "local_ip", st2110.local_ip});
result.push_back({prefix + "remote_ip", st2110.remote_ip});
result.push_back({prefix + "transport", st2110.transport});
result.push_back({prefix + "remote_port", to_string<int>(st2110.remote_port)});
result.push_back({prefix + "st_payload_type", to_string<int>(st2110.payload_type)});
}

// Function to convert MCM to vector of string pairs
static void mcmToStringPairs(const MCM& mcm, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "conn_type", mcm.conn_type});
result.push_back({prefix + "transport", mcm.transport});
result.push_back({prefix + "transport_pixel_format", mcm.transport_pixel_format});
result.push_back({prefix + "ip", mcm.ip});
result.push_back({prefix + "port", to_string<int>(mcm.port)});
result.push_back({prefix + "urn", mcm.urn});
}

// Function to convert Payload to vector of string pairs
static void payloadToStringPairs(const Payload& payload, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "payload_type", to_string<int>(payload.type)});
if (payload.type == video) {
videoToStringPairs(payload.video, result, prefix);
} else if (payload.type == audio) {
audioToStringPairs(payload.audio, result, prefix);
}
}

// Function to convert StreamType to vector of string pairs
static void streamTypeToStringPairs(const StreamType& streamType, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
result.push_back({prefix + "stream_type", to_string<int>(streamType.type)});
if (streamType.type == file) {
fileToStringPairs(streamType.file, result, prefix);
} else if (streamType.type == st2110) {
st2110ToStringPairs(streamType.st2110, result, prefix);
} else if (streamType.type == mcm) {
mcmToStringPairs(streamType.mcm, result, prefix);
}
}

// Function to convert Stream to vector of string pairs
static void streamToStringPairs(const Stream& stream, std::vector<std::pair<std::string, std::string>>& result, const std::string& prefix) {
payloadToStringPairs(stream.payload, result, prefix);
streamTypeToStringPairs(stream.stream_type, result, prefix);
}

// Function to convert Config to vector of string pairs
std::vector<std::pair<std::string, std::string>> commitConfigs(const Config& config) {
std::vector<std::pair<std::string, std::string>> result;

std::string json_str;
if (serialize_config_json(config, json_str) == 0) {
result.push_back({"json", json_str});
}
else {
std::cout << "Error serializing Config to json, trying previos solution" << std::endl;

result.push_back({"function", config.function});
result.push_back({"gpu_hw_acceleration", config.gpu_hw_acceleration});
result.push_back({"logging_level", to_string<int>(config.logging_level)});
for (size_t i = 0; i < config.senders.size(); ++i) {
streamToStringPairs(config.senders[i], result, "sender_" + to_string<int>(i) + "_");
}
for (size_t i = 0; i < config.receivers.size(); ++i) {
streamToStringPairs(config.receivers[i], result, "receiver_" + to_string<int>(i) + "_");
}
};

return result;
}
3 changes: 3 additions & 0 deletions gRPC/FFmpeg_wrapper_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include "build/ffmpeg_cmd_wrap.grpc.pb.h"
#include "config_params.hpp"

using grpc::Channel;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;

std::vector<std::pair<std::string, std::string>> commitConfigs(const Config& config);

class CmdPassClient {
public:
CmdPassClient(std::string interface, std::string port);
Expand Down
Loading

0 comments on commit e4791a0

Please sign in to comment.