Skip to content

Commit dec92ea

Browse files
authored
feat: add log watcher endpoint and update test module imports (#768)
1 parent f21776b commit dec92ea

File tree

4 files changed

+560
-15
lines changed

4 files changed

+560
-15
lines changed
Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
//
2+
// Copyright © 2025 Agora
3+
// This file is part of TEN Framework, an open source project.
4+
// Licensed under the Apache License, Version 2.0, with certain conditions.
5+
// Refer to the "LICENSE" file in the root directory for more information.
6+
//
7+
use std::sync::Arc;
8+
9+
use actix::fut;
10+
use actix::{Actor, AsyncContext, Handler, Message, StreamHandler};
11+
use actix_web::{web, Error, HttpRequest, HttpResponse};
12+
use actix_web_actors::ws;
13+
use anyhow::Result;
14+
use serde::{Deserialize, Serialize};
15+
use serde_json::json;
16+
use tokio::sync::Mutex;
17+
18+
use crate::designer::DesignerState;
19+
use crate::fs::file_watcher::{FileContentStream, FileWatchOptions};
20+
use crate::pkg_info::property::get_log_file_path;
21+
22+
// Message types for WebSocket communication
23+
#[derive(Message, Debug, Serialize, Deserialize)]
24+
#[rtype(result = "()")]
25+
pub struct StopWatching;
26+
27+
#[derive(Message, Debug, Serialize, Deserialize)]
28+
#[rtype(result = "()")]
29+
pub struct SetAppBaseDir {
30+
pub app_base_dir: String,
31+
}
32+
33+
#[derive(Message, Debug, Serialize, Deserialize)]
34+
#[rtype(result = "()")]
35+
pub struct FileContent(pub Vec<u8>);
36+
37+
#[derive(Message, Debug, Serialize, Deserialize)]
38+
#[rtype(result = "()")]
39+
pub struct CloseConnection;
40+
41+
#[derive(Message, Debug, Serialize, Deserialize)]
42+
#[rtype(result = "()")]
43+
pub struct ErrorMessage(pub String);
44+
45+
#[derive(Message, Debug, Serialize, Deserialize)]
46+
#[rtype(result = "()")]
47+
pub struct InfoMessage(pub String);
48+
49+
#[derive(Message)]
50+
#[rtype(result = "()")]
51+
pub struct StoreWatcher(pub FileContentStream);
52+
53+
// WebSocket actor for log file watching.
54+
struct WsLogWatcher {
55+
app_base_dir: Option<String>,
56+
file_watcher: Option<Arc<Mutex<FileContentStream>>>,
57+
}
58+
59+
impl WsLogWatcher {
60+
fn new() -> Self {
61+
Self { app_base_dir: None, file_watcher: None }
62+
}
63+
64+
fn stop_watching(&mut self) {
65+
if let Some(watcher) = &self.file_watcher {
66+
// Create a new task to stop the watcher
67+
let watcher_clone = watcher.clone();
68+
tokio::spawn(async move {
69+
let mut guard = watcher_clone.lock().await;
70+
guard.stop();
71+
});
72+
}
73+
self.file_watcher = None;
74+
}
75+
}
76+
77+
impl Actor for WsLogWatcher {
78+
type Context = ws::WebsocketContext<Self>;
79+
80+
fn started(&mut self, ctx: &mut Self::Context) {
81+
// Send a welcome message that we're ready to receive the app_base_dir.
82+
ctx.text(
83+
json!({
84+
"type": "ready",
85+
"message": "Ready to receive app_base_dir"
86+
})
87+
.to_string(),
88+
);
89+
}
90+
91+
fn stopped(&mut self, _ctx: &mut Self::Context) {
92+
// Just set file_watcher to None to release our reference to it.
93+
// The tokio runtime will clean up any remaining tasks.
94+
self.file_watcher = None;
95+
}
96+
}
97+
98+
impl Handler<SetAppBaseDir> for WsLogWatcher {
99+
type Result = ();
100+
101+
fn handle(
102+
&mut self,
103+
msg: SetAppBaseDir,
104+
ctx: &mut Self::Context,
105+
) -> Self::Result {
106+
// Store the app_base_dir.
107+
self.app_base_dir = Some(msg.app_base_dir.clone());
108+
109+
// Clone what we need for the async task.
110+
let app_base_dir = msg.app_base_dir;
111+
let addr = ctx.address();
112+
113+
// Spawn a task to handle the async file watching.
114+
ctx.spawn(fut::wrap_future(async move {
115+
// Get the log file path from property.json.
116+
let log_file_path = match get_log_file_path(&app_base_dir) {
117+
Some(path) => path,
118+
None => {
119+
let _ = addr.try_send(ErrorMessage(
120+
"No log file specified in property.json".to_string(),
121+
));
122+
let _ = addr.try_send(CloseConnection);
123+
return;
124+
}
125+
};
126+
127+
// Create file watch options.
128+
let options = FileWatchOptions::default();
129+
130+
// Start watching the file.
131+
match crate::fs::file_watcher::watch_file(
132+
&log_file_path,
133+
Some(options),
134+
)
135+
.await
136+
{
137+
Ok(stream) => {
138+
// Successfully started watching.
139+
let _ = addr.try_send(InfoMessage(
140+
"Started watching log file".to_string(),
141+
));
142+
143+
// Send the stream to the actor to store.
144+
let _ = addr.try_send(StoreWatcher(stream));
145+
}
146+
Err(e) => {
147+
let _ = addr.try_send(ErrorMessage(e.to_string()));
148+
let _ = addr.try_send(CloseConnection);
149+
}
150+
}
151+
}));
152+
}
153+
}
154+
155+
impl Handler<FileContent> for WsLogWatcher {
156+
type Result = ();
157+
158+
fn handle(
159+
&mut self,
160+
msg: FileContent,
161+
ctx: &mut Self::Context,
162+
) -> Self::Result {
163+
// Send the file content to the WebSocket client.
164+
ctx.binary(msg.0);
165+
}
166+
}
167+
168+
impl Handler<ErrorMessage> for WsLogWatcher {
169+
type Result = ();
170+
171+
fn handle(
172+
&mut self,
173+
msg: ErrorMessage,
174+
ctx: &mut Self::Context,
175+
) -> Self::Result {
176+
// Send the error message to the WebSocket client.
177+
ctx.text(
178+
json!({
179+
"type": "error",
180+
"message": msg.0
181+
})
182+
.to_string(),
183+
);
184+
}
185+
}
186+
187+
impl Handler<InfoMessage> for WsLogWatcher {
188+
type Result = ();
189+
190+
fn handle(
191+
&mut self,
192+
msg: InfoMessage,
193+
ctx: &mut Self::Context,
194+
) -> Self::Result {
195+
// Send the info message to the WebSocket client.
196+
ctx.text(
197+
json!({
198+
"type": "info",
199+
"message": msg.0
200+
})
201+
.to_string(),
202+
);
203+
}
204+
}
205+
206+
impl Handler<CloseConnection> for WsLogWatcher {
207+
type Result = ();
208+
209+
fn handle(
210+
&mut self,
211+
_: CloseConnection,
212+
ctx: &mut Self::Context,
213+
) -> Self::Result {
214+
// Stop watching and close the connection.
215+
self.stop_watching();
216+
ctx.close(None);
217+
}
218+
}
219+
220+
impl Handler<StopWatching> for WsLogWatcher {
221+
type Result = ();
222+
223+
fn handle(
224+
&mut self,
225+
_: StopWatching,
226+
ctx: &mut Self::Context,
227+
) -> Self::Result {
228+
// Stop watching the file but keep the connection open.
229+
self.stop_watching();
230+
ctx.text(
231+
json!({
232+
"type": "info",
233+
"message": "Stopped watching log file"
234+
})
235+
.to_string(),
236+
);
237+
}
238+
}
239+
240+
impl Handler<StoreWatcher> for WsLogWatcher {
241+
type Result = ();
242+
243+
fn handle(
244+
&mut self,
245+
msg: StoreWatcher,
246+
ctx: &mut Self::Context,
247+
) -> Self::Result {
248+
// Set up a task to read from the stream and send to WebSocket.
249+
let stream = Arc::new(Mutex::new(msg.0));
250+
self.file_watcher = Some(stream.clone());
251+
252+
// Spawn a task to read from the stream and send to WebSocket.
253+
let addr = ctx.address();
254+
tokio::spawn(async move {
255+
loop {
256+
// Lock the mutex only for a short time to get the next item.
257+
let content = {
258+
let mut stream_guard = stream.lock().await;
259+
stream_guard.next().await
260+
};
261+
262+
// Process the content outside the lock.
263+
match content {
264+
Some(Ok(data)) => {
265+
if addr.try_send(FileContent(data)).is_err() {
266+
break;
267+
}
268+
}
269+
Some(Err(e)) => {
270+
let _ = addr.try_send(ErrorMessage(e.to_string()));
271+
let _ = addr.try_send(CloseConnection);
272+
break;
273+
}
274+
None => {
275+
// If we exit the loop normally, the file watching has
276+
// ended.
277+
let _ = addr.try_send(CloseConnection);
278+
break;
279+
}
280+
}
281+
}
282+
});
283+
}
284+
}
285+
286+
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsLogWatcher {
287+
fn handle(
288+
&mut self,
289+
msg: Result<ws::Message, ws::ProtocolError>,
290+
ctx: &mut Self::Context,
291+
) {
292+
match msg {
293+
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
294+
Ok(ws::Message::Text(text)) => {
295+
// Try to parse the received message as JSON.
296+
if let Ok(json) =
297+
serde_json::from_str::<serde_json::Value>(&text)
298+
{
299+
if let Some(message_type) =
300+
json.get("type").and_then(|v| v.as_str())
301+
{
302+
match message_type {
303+
"stop" => {
304+
let stop_msg = StopWatching;
305+
<Self as Handler<StopWatching>>::handle(
306+
self, stop_msg, ctx,
307+
);
308+
}
309+
"set_app_base_dir" => {
310+
if let Some(app_base_dir) = json
311+
.get("app_base_dir")
312+
.and_then(|v| v.as_str())
313+
{
314+
// Check if we already have an app_base_dir.
315+
if self.app_base_dir.is_some() {
316+
ctx.text(
317+
json!({
318+
"type": "error",
319+
"message": "App base directory already set"
320+
})
321+
.to_string(),
322+
);
323+
} else {
324+
// Set the app base directory and start
325+
// watching.
326+
let set_app_base_dir_msg =
327+
SetAppBaseDir {
328+
app_base_dir: app_base_dir
329+
.to_string(),
330+
};
331+
<Self as Handler<SetAppBaseDir>>::handle(
332+
self, set_app_base_dir_msg, ctx,
333+
);
334+
}
335+
} else {
336+
ctx.text(
337+
json!({
338+
"type": "error",
339+
"message": "Missing app_base_dir field"
340+
})
341+
.to_string(),
342+
);
343+
}
344+
}
345+
_ => {
346+
// Unknown message type.
347+
ctx.text(
348+
json!({
349+
"type": "error",
350+
"message": "Unknown message type"
351+
})
352+
.to_string(),
353+
);
354+
}
355+
}
356+
}
357+
}
358+
}
359+
Ok(ws::Message::Binary(_)) => {
360+
// Binary messages are not expected.
361+
ctx.text(
362+
json!({
363+
"type": "error",
364+
"message": "Binary messages are not supported"
365+
})
366+
.to_string(),
367+
);
368+
}
369+
_ => (),
370+
}
371+
}
372+
}
373+
374+
// WebSocket endpoint handler
375+
pub async fn log_watcher_endpoint(
376+
req: HttpRequest,
377+
stream: web::Payload,
378+
_state: web::Data<Arc<DesignerState>>,
379+
) -> Result<HttpResponse, Error> {
380+
// Start the WebSocket connection.
381+
ws::start(WsLogWatcher::new(), &req, stream)
382+
}

core/src/ten_manager/src/designer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod frontend;
1717
pub mod graphs;
1818
pub mod help_text;
1919
pub mod locale;
20+
pub mod log_watcher;
2021
pub mod manifest;
2122
pub mod messages;
2223
pub mod metadata;
@@ -115,6 +116,7 @@ pub fn configure_routes(
115116
.service(web::resource("/ws/builtin-function").route(web::get().to(builtin_function::builtin_function_endpoint)))
116117
.service(web::resource("/ws/exec").route(web::get().to(exec::exec_endpoint)))
117118
.service(web::resource("/ws/terminal").route(web::get().to(terminal::ws_terminal_endpoint)))
119+
.service(web::resource("/ws/log-watcher").route(web::get().to(log_watcher::log_watcher_endpoint)))
118120
// Doc endpoints.
119121
.service(web::resource("/help-text").route(web::post().to(help_text::get_help_text_endpoint)))
120122
.service(web::resource("/doc-link").route(web::post().to(doc_link::get_doc_link_endpoint)))

0 commit comments

Comments
 (0)