Skip to content

Commit

Permalink
Impl logging
Browse files Browse the repository at this point in the history
  • Loading branch information
james58899 committed Sep 6, 2023
1 parent 6322fc1 commit bc3628d
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ filesize = "0.2"
filetime = "0.2"
futures = "0.3"
hex = "0.4"
log = "0.4"
log = { version = "0.4", features = ["std"] }
mime = "0.3"
openssl = { version = "*", features = ["vendored"] }
parking_lot = { version = "0.12", features = ["hardware-lock-elision", "deadlock_detection"] }
Expand Down
193 changes: 173 additions & 20 deletions src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,167 @@
use std::{
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use chrono::{SecondsFormat, Utc};
use log::{Level, LevelFilter, Metadata, Record, SetLoggerError};
use log::{info, Level, LevelFilter, Metadata, Record, SetLoggerError};
use tokio::{
fs::{rename, try_exists, File},
io::{AsyncWriteExt, BufWriter},
select,
sync::{
mpsc::{unbounded_channel, UnboundedSender},
Notify,
},
task::JoinHandle,
};

pub struct Logger {
config: Arc<LoggerConfig>,
shutdown: Arc<Notify>,
handle: JoinHandle<()>,
}

struct Logger;
pub struct LoggerConfig {
write_info: AtomicBool,
flush: AtomicBool,
}

static LOGGER: Logger = Logger;
struct LoggerWorker {
tx: UnboundedSender<LoggerMessage>,
}

pub fn init() -> Result<(), SetLoggerError> {
log::set_logger(&LOGGER).map(|()| log::set_max_level(LevelFilter::Debug))
struct LoggerMessage {
level: Level,
message: String,
}

impl log::Log for Logger {
impl Logger {
pub fn init<P: AsRef<Path>>(log_dir: P) -> Result<Self, SetLoggerError> {
let config = Arc::new(LoggerConfig {
write_info: true.into(),
flush: false.into(),
});

let (worker, shutdown, handle) = LoggerWorker::new(log_dir, config.clone());
let logger = Self {
config: config.clone(),
shutdown,
handle,
};

log::set_boxed_logger(Box::new(worker))
.map(|()| log::set_max_level(LevelFilter::Debug))
.map(|()| logger)
}

pub fn config(&self) -> Arc<LoggerConfig> {
self.config.clone()
}

pub async fn shutdown(self) {
info!("Shutdown logger...");
self.shutdown.notify_one();
let _ = self.handle.await;
}
}

impl LoggerConfig {
pub fn write_info(&self, enabled: bool) {
self.write_info.store(enabled, Ordering::Relaxed);
}

pub fn flush(&self, enabled: bool) {
self.flush.store(enabled, Ordering::Relaxed);
}
}

impl LoggerWorker {
fn new<P: AsRef<Path>>(log_dir: P, config: Arc<LoggerConfig>) -> (Self, Arc<Notify>, JoinHandle<()>) {
let shutdown = Arc::new(Notify::new());
let (tx, mut rx) = unbounded_channel::<LoggerMessage>();
let log_dir = log_dir.as_ref().to_owned();
let shutdown2 = shutdown.clone();
let worker = tokio::spawn(async move {
let log_out = &log_dir.join("log_out");
let log_err = &log_dir.join("log_err");

rotate_log(&[log_out, log_err]).await;

let mut err_lines: u32 = 0;
let mut out_lines: u32 = 0;
let mut writer_err = match std::fs::File::create(log_err).map(File::from_std).map(BufWriter::new) {
Ok(w) => w,
Err(err) => {
eprintln!("Log create error: {:?}", err);
return;
}
};
let mut writer_out = std::fs::File::create(log_out).map(File::from_std).map(BufWriter::new).unwrap();

loop {
select! {
log = rx.recv() => {
if log.is_none() {
let _ = writer_err.shutdown().await;
let _ = writer_out.shutdown().await;
break
}
let log = log.unwrap();

if log.level <= Level::Warn {
// stderr
eprintln!("{}", log.message);

// file
err_lines += 1;
if err_lines > 100000 {
// log routate
let _ = writer_err.shutdown().await;
rotate_log(&[log_err]).await;
writer_err = File::create(log_err).await.map(BufWriter::new).unwrap();
err_lines = 0;
}
let _ = writer_err.write_all(&[log.message.as_bytes(), b"\n"].concat()).await;
if config.flush.load(Ordering::Relaxed) {
let _ = writer_err.flush().await;
}
} else {
// stdout
println!("{}", log.message);

// file
if config.write_info.load(Ordering::Relaxed) {
out_lines += 1;
if out_lines > 100000 {
// log routate
let _ = writer_out.shutdown().await;
rotate_log(&[log_out]).await;
writer_out = File::create(log_out).await.map(BufWriter::new).unwrap();
out_lines = 0;
}
let _ = writer_out.write_all(&[log.message.as_bytes(), b"\n"].concat()).await;
if config.flush.load(Ordering::Relaxed) {
let _ = writer_out.flush().await;
}
}
}
}
_ = shutdown2.notified() => {
rx.close()
}
}
}
});

(Self { tx }, shutdown, worker)
}
}

impl log::Log for LoggerWorker {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Debug
}
Expand All @@ -21,26 +173,27 @@ impl log::Log for Logger {

let level = record.level();
let time = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
if level <= Level::Warn {
eprintln!(
"{} [{}/{}] {}",
time,
level.to_string().to_lowercase(),
record.target(),
record.args()
);
} else {
println!(
let _ = self.tx.send(LoggerMessage {
level,
message: format!(
"{} [{}/{}] {}",
time,
level.to_string().to_lowercase(),
record.target(),
record.args()
);
}

// TODO write log file
),
});
}

fn flush(&self) {}
}

async fn rotate_log(files: &[&PathBuf]) {
for path in files {
if try_exists(path).await.unwrap_or(false) {
let mut old = path.to_path_buf();
old.set_extension("old");
rename(path, old).await.unwrap();
}
}
}
28 changes: 22 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use log::{error, info, warn};
#[cfg(target_env = "msvc")]
use mimalloc::MiMalloc;
use openssl::{
ssl::{ClientHelloResponse, SslAcceptor, SslAcceptorBuilder, SslMethod, SslOptions}, pkcs12::ParsedPkcs12_2,
pkcs12::ParsedPkcs12_2,
ssl::{ClientHelloResponse, SslAcceptor, SslAcceptorBuilder, SslMethod, SslOptions},
};
use parking_lot::{Mutex, RwLock};
use tempfile::TempPath;
Expand All @@ -44,6 +45,7 @@ use tokio::{
use crate::{
cache_manager::{CacheFileInfo, CacheManager},
gallery_downloader::GalleryDownloader,
logger::Logger,
rpc::RPCClient,
util::{create_dirs, create_http_client},
};
Expand Down Expand Up @@ -87,6 +89,12 @@ struct Args {

#[arg(long)]
temp_dir: Option<String>,

#[arg(long)]
disable_logging: Option<bool>,

#[arg(long, default_value_t = false)]
flush_log: bool,
}

type DownloadState = RwLock<HashMap<[u8; 20], (Arc<TempPath>, watch::Receiver<u64>)>>;
Expand Down Expand Up @@ -119,7 +127,14 @@ async fn main() -> Result<(), Box<dyn Error>> {

create_dirs(vec![&data_dir, &log_dir, &cache_dir, &temp_dir, &download_dir]).await?;

init_logger();
// Init logger
let logger = Logger::init(log_dir).unwrap();
if args.disable_logging.unwrap_or(false) {
logger.config().write_info(false);
}
if args.flush_log {
logger.config().flush(true);
}

info!("Hentai@Home {} (Rust) starting up", CLIENT_VERSION);

Expand Down Expand Up @@ -176,6 +191,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client2 = client.clone();
let downloader = Arc::new(Mutex::new(None));
let downloader2 = downloader.clone();
let logger_config = logger.config();
tokio::spawn(async move {
let mut last_overload = Instant::now().checked_sub(Duration::from_secs(30)).unwrap_or_else(Instant::now);
while let Some(command) = rx.recv().await {
Expand All @@ -198,6 +214,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
Command::RefreshSettings => {
client2.refresh_settings().await;
if args.disable_logging.is_none() {
logger_config.write_info(!client2.settings().disable_logging());
}
}
Command::StartDownloader => {
let mut downloader = downloader2.lock();
Expand Down Expand Up @@ -262,16 +281,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("Shutdown in progress - please wait");
sleep(Duration::from_secs(30)).await;
server_handle.stop(true).await;
logger.shutdown().await;
Ok(())
}

/**
* main helper
*/
fn init_logger() {
logger::init().unwrap();
}

async fn read_credential<P: AsRef<Path>>(data_path: P) -> Option<(i32, String)> {
let path = data_path.as_ref().join("client_login");
let mut file = File::open(path.clone()).map_ok(|f| BufReader::new(f).lines()).await.ok()?; // TODO better error handle
Expand Down
10 changes: 10 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct RPCClient {
pub struct Settings {
size_limit: AtomicU64,
throttle_bytes: AtomicU64,
disable_logging: AtomicBool,
}

pub struct InitSettings {
Expand Down Expand Up @@ -84,6 +85,10 @@ impl Settings {
20 + min(480, self.throttle_bytes.load(Ordering::Relaxed) / 10000)
}

pub fn disable_logging(&self) -> bool {
self.disable_logging.load(Ordering::Relaxed)
}

fn update(&self, settings: HashMap<String, String>) {
if let Some(size) = settings.get("disklimit_bytes").and_then(|s| s.parse().ok()) {
self.size_limit.store(size, Ordering::Relaxed);
Expand All @@ -93,6 +98,10 @@ impl Settings {
self.throttle_bytes.store(size, Ordering::Relaxed);
}

if let Some(disabled) = settings.get("disable_logging").and_then(|s| s.parse().ok()) {
self.disable_logging.store(disabled, Ordering::Relaxed);
}

// TODO update other settings
}
}
Expand All @@ -110,6 +119,7 @@ impl RPCClient {
settings: Arc::new(Settings {
size_limit: AtomicU64::new(u64::MAX),
throttle_bytes: AtomicU64::new(0),
disable_logging: AtomicBool::new(false),
}),
}
}
Expand Down

0 comments on commit bc3628d

Please sign in to comment.