Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds flag --history-only to "hq journal stream" #841

Merged
merged 2 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions crates/hyperqueue/src/client/commands/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::common::utils::str::pluralize;
use crate::rpc_call;
use crate::server::bootstrap::get_client_session;
use crate::server::event::journal::JournalReader;
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
use anyhow::anyhow;
use clap::{Parser, ValueHint};
use std::io::{BufWriter, Write};
Expand All @@ -25,9 +25,14 @@ enum JournalCommand {
/// Events will be exported to `stdout`, you can redirect it e.g. to a file.
Export(ExportOpts),

/// Live stream events from the server.
/// Stream events from a running server, it first replays old events
/// then it waits for new live events.
Stream,

/// Stream events from a running server, it replays old events
/// after that it terminates the connection.
Replay,

/// Connect to a server and remove completed tasks and non-active workers from journal
Prune,

Expand All @@ -46,17 +51,20 @@ struct ExportOpts {
pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> anyhow::Result<()> {
match opts.command {
JournalCommand::Export(opts) => export_json(opts),
JournalCommand::Stream => stream_json(gsettings).await,
JournalCommand::Replay => stream_json(gsettings, false).await,
JournalCommand::Stream => stream_json(gsettings, true).await,
JournalCommand::Prune => prune_journal(gsettings).await,
JournalCommand::Flush => flush_journal(gsettings).await,
}
}

async fn stream_json(gsettings: &GlobalSettings) -> anyhow::Result<()> {
async fn stream_json(gsettings: &GlobalSettings, live_events: bool) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
connection
.connection()
.send(FromClientMessage::StreamEvents)
.send(FromClientMessage::StreamEvents(StreamEvents {
live_events,
}))
.await?;
let stdout = std::io::stdout();
let stdout = stdout.lock();
Expand Down
6 changes: 4 additions & 2 deletions crates/hyperqueue/src/dashboard/data/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::server::event::Event;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
use std::time::Duration;
use tokio::sync::mpsc::Sender;

Expand All @@ -10,7 +10,9 @@ pub async fn create_data_fetch_process(
) -> anyhow::Result<()> {
session
.connection()
.send(FromClientMessage::StreamEvents)
.send(FromClientMessage::StreamEvents(StreamEvents {
live_events: true,
}))
.await?;

const CAPACITY: usize = 1024;
Expand Down
38 changes: 25 additions & 13 deletions crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,26 @@ async fn handle_client(
Ok(())
}

async fn stream_events<
Tx: Sink<ToClientMessage, Error = HqError> + Unpin + 'static,
Rx: Stream<Item = crate::Result<FromClientMessage>> + Unpin,
>(
async fn stream_history_events<Tx: Sink<ToClientMessage, Error = HqError> + Unpin + 'static>(
tx: &mut Tx,
rx: &mut Rx,
mut history: mpsc::UnboundedReceiver<Event>,
mut current: mpsc::UnboundedReceiver<Event>,
) {
log::debug!("Resending history started");
while let Some(e) = history.recv().await {
if tx.send(ToClientMessage::Event(e)).await.is_err() {
return;
}
}
}

async fn stream_events<
Tx: Sink<ToClientMessage, Error = HqError> + Unpin + 'static,
Rx: Stream<Item = crate::Result<FromClientMessage>> + Unpin,
>(
tx: &mut Tx,
rx: &mut Rx,
mut current: mpsc::UnboundedReceiver<Event>,
) {
log::debug!("History streaming completed");
loop {
let r = tokio::select! {
Expand Down Expand Up @@ -164,18 +169,25 @@ pub async fn client_rpc_loop<
FromClientMessage::CloseJob(msg) => {
handle_job_close(&state_ref, senders, &msg.selector).await
}
FromClientMessage::StreamEvents => {
FromClientMessage::StreamEvents(msg) => {
log::debug!("Start streaming events to client");
/* We create two event queues, one for historic events and one for live events
So while historic events are loaded from the file and streamed, live events are already
collected and sent immediately the historic events are sent */
let (tx1, rx1) = mpsc::unbounded_channel::<Event>();
let (tx2, rx2) = mpsc::unbounded_channel::<Event>();
let listener_id = senders.events.register_listener(tx1, tx2);

stream_events(&mut tx, &mut rx, rx1, rx2).await;

senders.events.unregister_listener(listener_id);
let live = if msg.live_events {
let (tx2, rx2) = mpsc::unbounded_channel::<Event>();
let listener_id = senders.events.register_listener(tx2);
Some((rx2, listener_id))
} else {
None
};
senders.events.replay_journal(tx1);
stream_history_events(&mut tx, rx1).await;
if let Some((rx2, listener_id)) = live {
stream_events(&mut tx, &mut rx, rx2).await;
senders.events.unregister_listener(listener_id);
}
break;
}
FromClientMessage::ServerInfo => {
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/server/event/journal/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::sync::mpsc;

pub enum EventStreamMessage {
Event(Event),
RegisterListener(mpsc::UnboundedSender<Event>),
ReplayJournal(mpsc::UnboundedSender<Event>),
PruneJournal {
callback: tokio::sync::oneshot::Sender<()>,
live_jobs: Set<JobId>,
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn streaming_process(
break
}
}
Some(EventStreamMessage::RegisterListener(tx)) => {
Some(EventStreamMessage::ReplayJournal(tx)) => {
/* We are blocking the thread here, but it is intended.
But we are blocking just a thread managing log file, not the whole HQ
And while this read is performed, we cannot allow modification of the file,
Expand Down
26 changes: 13 additions & 13 deletions crates/hyperqueue/src/server/event/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,19 @@ impl EventStreamer {
});
}

pub fn register_listener(
&self,
history_sender: mpsc::UnboundedSender<Event>,
current_sender: mpsc::UnboundedSender<Event>,
) -> u32 {
pub fn replay_journal(&self, history_sender: mpsc::UnboundedSender<Event>) {
let inner = self.inner.get();
if let Some(ref streamer) = inner.storage_sender {
if streamer
.send(EventStreamMessage::ReplayJournal(history_sender))
.is_err()
{
log::error!("Event streaming queue has been closed.");
}
}
}

pub fn register_listener(&self, current_sender: mpsc::UnboundedSender<Event>) -> u32 {
let mut inner = self.inner.get_mut();
let listener_id = inner
.client_listeners
Expand All @@ -183,14 +191,6 @@ impl EventStreamer {
.unwrap_or(0)
+ 1;
inner.client_listeners.push((current_sender, listener_id));
if let Some(ref streamer) = inner.storage_sender {
if streamer
.send(EventStreamMessage::RegisterListener(history_sender))
.is_err()
{
log::error!("Event streaming queue has been closed.");
}
}
listener_id
}

Expand Down
7 changes: 6 additions & 1 deletion crates/hyperqueue/src/transfer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum FromClientMessage {
// This command switches the connection into streaming connection,
// it will no longer reacts to any other client messages
// and client will only receive ToClientMessage::Event
StreamEvents,
StreamEvents(StreamEvents),
PruneJournal,
FlushJournal,
}
Expand Down Expand Up @@ -86,6 +86,11 @@ pub struct TaskKindProgram {
pub task_dir: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StreamEvents {
pub live_events: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TaskKind {
ExternalProgram(TaskKindProgram),
Expand Down
6 changes: 4 additions & 2 deletions tests/autoalloc/test_autoalloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ def test_slurm_queue_sbatch_additional_output(hq_env: HqEnv):
class Handler(SlurmCommandHandler):
async def handle_submit(self, input: CommandInput) -> CommandOutput:
output = await super().handle_submit(input)
return response(f"""
return response(
f"""
No reservation for this job
--> Verifying valid submit host (login)...OK
--> Verifying valid jobname...OK
--> Verifying valid ssh keys...OK
--> Verifying access to desired queue (normal)...OK
--> Checking available allocation...OK
{output.stdout}
""")
"""
)

with MockJobManager(hq_env, Handler(DefaultManager())):
hq_env.start_server()
Expand Down
10 changes: 9 additions & 1 deletion tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ def test_worker_stream_events2(hq_env: HqEnv, tmp_path):
assert events[2]["event"]["desc"]["name"] == "uname"


def test_worker_journal_replay(hq_env: HqEnv, tmp_path):
journal = tmp_path.joinpath("test.journal")
hq_env.start_server(args=["--journal", journal])
r = hq_env.command(["journal", "replay"])
msg = json.loads(r)
assert msg["event"]["type"] == "server-start"


def test_worker_connected_event(hq_env: HqEnv):
def body():
hq_env.start_worker()
Expand Down Expand Up @@ -168,7 +176,7 @@ def body():
}
}
print(json.dumps(data))
""",
""",
):
hq_env.start_worker(args=["--overview-interval", "10ms", "--resource", "gpus/amd=[0]"])
wait_for_worker_state(hq_env, 1, "RUNNING")
Expand Down
Loading