diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index 7286da1e6..49f5708ea 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -2,10 +2,6 @@ use clap::{CommandFactory, FromArgMatches}; use clap_complete::generate; use cli_table::ColorChoice; use colored::Colorize; -use std::io; -use std::io::IsTerminal; -use std::panic::PanicHookInfo; - use hyperqueue::HQ_VERSION; use hyperqueue::client::commands::autoalloc::command_autoalloc; use hyperqueue::client::commands::doc::command_doc; @@ -16,7 +12,7 @@ use hyperqueue::client::commands::job::{ }; use hyperqueue::client::commands::journal::command_journal; use hyperqueue::client::commands::outputlog::command_reader; -use hyperqueue::client::commands::server::command_server; +use hyperqueue::client::commands::server::{ServerCommand, ServerOpts, command_server}; use hyperqueue::client::commands::submit::command::{SubmitJobConfOpts, open_job}; use hyperqueue::client::commands::submit::{ JobSubmitFileOpts, JobSubmitOpts, submit_computation, submit_computation_from_job_file, @@ -40,7 +36,7 @@ use hyperqueue::client::task::{ use hyperqueue::common::cli::{ ColorPolicy, CommonOpts, DeploySshOpts, GenerateCompletionOpts, HwDetectOpts, JobCommand, JobProgressOpts, JobWaitOpts, OptsWithMatches, RootOptions, SubCommand, WorkerAddressOpts, - WorkerCommand, WorkerInfoOpts, WorkerListOpts, WorkerStopOpts, WorkerWaitOpts, + WorkerCommand, WorkerInfoOpts, WorkerListOpts, WorkerOpts, WorkerStopOpts, WorkerWaitOpts, get_task_id_selector, get_task_selector, }; use hyperqueue::common::setup::setup_logging; @@ -52,6 +48,10 @@ use hyperqueue::transfer::messages::{ use hyperqueue::worker::hwdetect::{ detect_additional_resources, detect_cpus, prune_hyper_threading, }; +use nix::sys::signal::{SigHandler, Signal}; +use std::io; +use std::io::IsTerminal; +use std::panic::PanicHookInfo; use tako::resources::{CPU_RESOURCE_NAME, ResourceDescriptor, ResourceDescriptorItem}; #[cfg(feature = "jemalloc")] @@ -382,6 +382,17 @@ environment variable, and attach the logs to the issue, to provide us more infor }; } +#[cfg(unix)] +fn reset_sigpipe() { + unsafe { + nix::sys::signal::signal(Signal::SIGPIPE, SigHandler::SigDfl) + .expect("cannot reset sigpipe"); + } +} + +#[cfg(not(unix))] +fn reset_sigpipe() {} + #[tokio::main(flavor = "current_thread")] async fn main() -> hyperqueue::Result<()> { // Augment panics - first print the error and backtrace like normally, @@ -419,6 +430,35 @@ async fn main() -> hyperqueue::Result<()> { let gsettings = make_global_settings(top_opts.common); + let is_cli_like = match &top_opts.subcmd { + SubCommand::Server(ServerOpts { + subcmd: ServerCommand::Start(_), + }) => false, + SubCommand::Worker(WorkerOpts { + subcmd: WorkerCommand::Start(_), + }) => false, + #[cfg(feature = "dashboard")] + SubCommand::Dashboard(_) => false, + _ => true, + }; + + if is_cli_like { + // When our stdout is attached to a pipe and the pipe is closed, + // it manifests as an I/O error, because the Rust runtime ignores + // SIGPIPE by default. + // This in turn causes `println!` to panic, which is not ideal, + // because it crashes HQ when used with Unix CLI utilities (such as `head`). + // Therefore, we reset SIGPIPE to its default behavior (terminate the process) + // to avoid the panics. + // See https://github.com/It4innovations/hyperqueue/issues/851. + // However, we only do this for client commands, which are short running and + // designed to be combined with other CLI tools. + // Enabling this for server and workers has unintended consequences, for example + // when a worker writes stdin to a task and the task has closed its stdin, then + // this would terminate the worker. + reset_sigpipe(); + } + let result = match top_opts.subcmd { SubCommand::Server(opts) => command_server(&gsettings, opts).await, SubCommand::Worker(opts) => match opts.subcmd { diff --git a/crates/hyperqueue/src/client/commands/server.rs b/crates/hyperqueue/src/client/commands/server.rs index 73c737895..344d57768 100644 --- a/crates/hyperqueue/src/client/commands/server.rs +++ b/crates/hyperqueue/src/client/commands/server.rs @@ -19,7 +19,7 @@ use std::time::Duration; #[derive(Parser)] pub struct ServerOpts { #[clap(subcommand)] - subcmd: ServerCommand, + pub subcmd: ServerCommand, } #[derive(Parser)] @@ -57,7 +57,7 @@ pub struct GenerateAccessOpts { } #[derive(Parser)] -enum ServerCommand { +pub enum ServerCommand { /// Start the HyperQueue server Start(ServerStartOpts), /// Stop the HyperQueue server, if it is running @@ -69,7 +69,7 @@ enum ServerCommand { } #[derive(Parser)] -struct ServerStartOpts { +pub struct ServerStartOpts { /// Hostname/IP of the machine under which is visible to others, default: hostname #[arg(long)] host: Option, @@ -119,10 +119,10 @@ struct ServerStartOpts { } #[derive(Parser)] -struct ServerStopOpts {} +pub struct ServerStopOpts {} #[derive(Parser)] -struct ServerInfoOpts {} +pub struct ServerInfoOpts {} pub async fn command_server(gsettings: &GlobalSettings, opts: ServerOpts) -> anyhow::Result<()> { match opts.subcmd {