@@ -27,6 +27,7 @@ limitations under the License.
2727#include " core/common/metrics.h"
2828#include " core/runtime/dit_master.h"
2929#include " core/runtime/llm_master.h"
30+ #include " core/runtime/rec_master.h"
3031#include " core/runtime/vlm_master.h"
3132#include " core/util/closure_guard.h"
3233#include " embedding.pb.h"
@@ -68,6 +69,9 @@ APIService::APIService(Master* master,
6869 image_generation_service_impl_ =
6970 std::make_unique<ImageGenerationServiceImpl>(
7071 dynamic_cast <DiTMaster*>(master), model_names);
72+ } else if (FLAGS_backend == " rec" ) {
73+ rec_completion_service_impl_ = std::make_unique<RecCompletionServiceImpl>(
74+ dynamic_cast <RecMaster*>(master), model_names);
7175 }
7276 models_service_impl_ =
7377 ServiceImplFactory<ModelsServiceImpl>::create_service_impl (
@@ -78,13 +82,6 @@ void APIService::Completions(::google::protobuf::RpcController* controller,
7882 const proto::CompletionRequest* request,
7983 proto::CompletionResponse* response,
8084 ::google::protobuf::Closure* done) {
81- // TODO with xllm-service
82- }
83-
84- void APIService::CompletionsHttp (::google::protobuf::RpcController* controller,
85- const proto::HttpRequest* request,
86- proto::HttpResponse* response,
87- ::google::protobuf::Closure* done) {
8885 xllm::ClosureGuard done_guard (
8986 done,
9087 std::bind (request_in_metric, nullptr ),
@@ -93,47 +90,38 @@ void APIService::CompletionsHttp(::google::protobuf::RpcController* controller,
9390 LOG (ERROR) << " brpc request | respose | controller is null" ;
9491 return ;
9592 }
96-
97- auto arena = response->GetArena ();
98- auto req_pb =
99- google::protobuf::Arena::CreateMessage<proto::CompletionRequest>(arena);
100- auto resp_pb =
101- google::protobuf::Arena::CreateMessage<proto::CompletionResponse>(arena);
102-
10393 auto ctrl = reinterpret_cast <brpc::Controller*>(controller);
104- std::string error;
105- json2pb::Json2PbOptions options;
106- butil::IOBuf& buf = ctrl->request_attachment ();
107- butil::IOBufAsZeroCopyInputStream iobuf_stream (buf);
108- auto st = json2pb::JsonToProtoMessage (&iobuf_stream, req_pb, options, &error);
109- if (!st) {
110- ctrl->SetFailed (error);
111- LOG (ERROR) << " parse json to proto failed: " << error;
112- return ;
113- }
11494
115- std::shared_ptr<Call> call = std::make_shared<CompletionCall>(
116- ctrl, done_guard.release (), req_pb, resp_pb);
117- completion_service_impl_->process_async (call);
118- }
119-
120- void APIService::ChatCompletions (::google::protobuf::RpcController* controller,
121- const proto::ChatRequest* request,
122- proto::ChatResponse* response,
123- ::google::protobuf::Closure* done) {
124- // TODO with xllm-service
95+ if (FLAGS_backend == " llm" ) {
96+ CHECK (completion_service_impl_) << " completion service is invalid." ;
97+ std::shared_ptr<Call> call = std::make_shared<CompletionCall>(
98+ ctrl,
99+ done_guard.release (),
100+ const_cast <proto::CompletionRequest*>(request),
101+ response);
102+ completion_service_impl_->process_async (call);
103+ } else if (FLAGS_backend == " rec" ) {
104+ CHECK (rec_completion_service_impl_)
105+ << " rec completion service is invalid." ;
106+ std::shared_ptr<Call> call = std::make_shared<CompletionCall>(
107+ ctrl,
108+ done_guard.release (),
109+ const_cast <proto::CompletionRequest*>(request),
110+ response);
111+ rec_completion_service_impl_->process_async (call);
112+ }
125113}
126114
127115namespace {
128- template <typename ChatCall , typename Service>
129- void ChatCompletionsImpl (std::unique_ptr<Service>& service,
130- xllm::ClosureGuard& guard,
131- ::google::protobuf::Arena* arena,
132- brpc::Controller* ctrl) {
116+ template <typename Call , typename Service>
117+ void CommonCompletionsImpl (std::unique_ptr<Service>& service,
118+ xllm::ClosureGuard& guard,
119+ ::google::protobuf::Arena* arena,
120+ brpc::Controller* ctrl) {
133121 auto req_pb =
134- google::protobuf::Arena::CreateMessage<typename ChatCall ::ReqType>(arena);
122+ google::protobuf::Arena::CreateMessage<typename Call ::ReqType>(arena);
135123 auto resp_pb =
136- google::protobuf::Arena::CreateMessage<typename ChatCall ::ResType>(arena);
124+ google::protobuf::Arena::CreateMessage<typename Call ::ResType>(arena);
137125
138126 std::string error;
139127 json2pb::Json2PbOptions options;
@@ -146,12 +134,46 @@ void ChatCompletionsImpl(std::unique_ptr<Service>& service,
146134 return ;
147135 }
148136
149- auto call =
150- std::make_shared<ChatCall>(ctrl, guard.release (), req_pb, resp_pb);
137+ auto call = std::make_shared<Call>(ctrl, guard.release (), req_pb, resp_pb);
151138 service->process_async (call);
152139}
153140} // namespace
154141
142+ void APIService::CompletionsHttp (::google::protobuf::RpcController* controller,
143+ const proto::HttpRequest* request,
144+ proto::HttpResponse* response,
145+ ::google::protobuf::Closure* done) {
146+ xllm::ClosureGuard done_guard (
147+ done,
148+ std::bind (request_in_metric, nullptr ),
149+ std::bind (request_out_metric, (void *)controller));
150+ if (!request || !response || !controller) {
151+ LOG (ERROR) << " brpc request | respose | controller is null" ;
152+ return ;
153+ }
154+
155+ auto arena = response->GetArena ();
156+ auto ctrl = reinterpret_cast <brpc::Controller*>(controller);
157+
158+ if (FLAGS_backend == " llm" ) {
159+ CHECK (completion_service_impl_) << " completion service is invalid." ;
160+ CommonCompletionsImpl<CompletionCall, CompletionServiceImpl>(
161+ completion_service_impl_, done_guard, arena, ctrl);
162+ } else if (FLAGS_backend == " rec" ) {
163+ CHECK (rec_completion_service_impl_)
164+ << " rec completion service is invalid." ;
165+ CommonCompletionsImpl<CompletionCall, RecCompletionServiceImpl>(
166+ rec_completion_service_impl_, done_guard, arena, ctrl);
167+ }
168+ }
169+
170+ void APIService::ChatCompletions (::google::protobuf::RpcController* controller,
171+ const proto::ChatRequest* request,
172+ proto::ChatResponse* response,
173+ ::google::protobuf::Closure* done) {
174+ // TODO with xllm-service
175+ }
176+
155177void APIService::ChatCompletionsHttp (
156178 ::google::protobuf::RpcController* controller,
157179 const proto::HttpRequest* request,
@@ -171,11 +193,11 @@ void APIService::ChatCompletionsHttp(
171193
172194 if (FLAGS_backend == " llm" ) {
173195 CHECK (chat_service_impl_) << " chat service is invalid." ;
174- ChatCompletionsImpl <ChatCall, ChatServiceImpl>(
196+ CommonCompletionsImpl <ChatCall, ChatServiceImpl>(
175197 chat_service_impl_, done_guard, arena, ctrl);
176198 } else if (FLAGS_backend == " vlm" ) {
177199 CHECK (mm_chat_service_impl_) << " mm chat service is invalid." ;
178- ChatCompletionsImpl <MMChatCall, MMChatServiceImpl>(
200+ CommonCompletionsImpl <MMChatCall, MMChatServiceImpl>(
179201 mm_chat_service_impl_, done_guard, arena, ctrl);
180202 }
181203}
0 commit comments