Skip to content

Add pipewire sound driver #2166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rustdoc-args = ["--cfg", "docsrs"]
async-trait = "0.1"
backon = { version = "1.2", default-features = false, features = ["tokio-sleep"] }
base64 = { version = "0.22.1" }
bitflags = "2.9"
bytes = "1.8"
calibright = { version = "0.1.13", features = ["watch"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "unstable-locales"] }
Expand Down
4 changes: 3 additions & 1 deletion cspell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ words:
- bugz
- busctl
- caldav
- cbrt
- ccache
- chrono
- clippy
Expand Down Expand Up @@ -86,6 +87,7 @@ words:
- kibi
- kmon
- libc
- libdbus
- liquidctl
- locid
- macchiato
Expand Down Expand Up @@ -187,8 +189,8 @@ words:
- xclip
- xcolors
- xesam
- xkbswitch
- xkbevent
- xkbswitch
- XKCD
- xrandr
- xresources
Expand Down
184 changes: 19 additions & 165 deletions src/blocks/privacy/pipewire.rs
Original file line number Diff line number Diff line change
@@ -1,166 +1,8 @@
use std::cell::Cell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Weak};
use std::thread;

use ::pipewire::{
context::Context, keys, main_loop::MainLoop, properties::properties, spa::utils::dict::DictRef,
types::ObjectType,
};
use itertools::Itertools as _;
use tokio::sync::Notify;
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};

use super::*;

static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new);

#[derive(Debug)]
struct Node {
name: String,
nick: Option<String>,
media_class: Option<String>,
media_role: Option<String>,
description: Option<String>,
}

impl Node {
fn new(global_id: u32, global_props: &DictRef) -> Self {
Self {
name: global_props
.get(&keys::NODE_NAME)
.map_or_else(|| format!("node_{global_id}"), |s| s.to_string()),
nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()),
media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()),
media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()),
description: global_props
.get(&keys::NODE_DESCRIPTION)
.map(|s| s.to_string()),
}
}
}

#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
struct Link {
link_output_node: u32,
link_input_node: u32,
}

impl Link {
fn new(global_props: &DictRef) -> Option<Self> {
if let (Some(link_output_node), Some(link_input_node)) = (
global_props
.get(&keys::LINK_OUTPUT_NODE)
.and_then(|s| s.parse().ok()),
global_props
.get(&keys::LINK_INPUT_NODE)
.and_then(|s| s.parse().ok()),
) {
Some(Self {
link_output_node,
link_input_node,
})
} else {
None
}
}
}

#[derive(Default)]
struct Data {
nodes: HashMap<u32, Node>,
links: HashMap<u32, Link>,
}

#[derive(Default)]
struct Client {
event_listeners: Mutex<Vec<Weak<Notify>>>,
data: Mutex<Data>,
}

impl Client {
fn new() -> Result<Client> {
thread::Builder::new()
.name("privacy_pipewire".to_string())
.spawn(Client::main_loop_thread)
.error("failed to spawn a thread")?;

Ok(Client::default())
}

fn main_loop_thread() {
let client = CLIENT.as_ref().error("Could not get client").unwrap();

let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")};

let main_loop = MainLoop::new(None).expect("Failed to create main loop");

let context =
Context::with_properties(&main_loop, proplist).expect("Failed to create context");
let core = context.connect(None).expect("Failed to connect");
let registry = core.get_registry().expect("Failed to get registry");

let updated = Rc::new(Cell::new(false));
let updated_copy = updated.clone();
let updated_copy2 = updated.clone();

// Register a callback to the `global` event on the registry, which notifies of any new global objects
// appearing on the remote.
// The callback will only get called as long as we keep the returned listener alive.
let _registry_listener = registry
.add_listener_local()
.global(move |global| {
let Some(global_props) = global.props else {
return;
};
match &global.type_ {
ObjectType::Node => {
client
.data
.lock()
.unwrap()
.nodes
.insert(global.id, Node::new(global.id, global_props));
updated_copy.set(true);
}
ObjectType::Link => {
let Some(link) = Link::new(global_props) else {
return;
};
client.data.lock().unwrap().links.insert(global.id, link);
updated_copy.set(true);
}
_ => (),
}
})
.global_remove(move |uid| {
let mut data = client.data.lock().unwrap();
if data.nodes.remove(&uid).is_some() || data.links.remove(&uid).is_some() {
updated_copy2.set(true);
}
})
.register();

loop {
main_loop.loop_().iterate(Duration::from_secs(60 * 60 * 24));
if updated.get() {
updated.set(false);
client
.event_listeners
.lock()
.unwrap()
.retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some());
}
}
}

fn add_event_listener(&self, notify: &Arc<Notify>) {
self.event_listeners
.lock()
.unwrap()
.push(Arc::downgrade(notify));
}
}
use crate::pipewire::{CLIENT, EventKind, Link, Node};

#[derive(Deserialize, Debug, SmartDefault)]
#[serde(rename_all = "lowercase", deny_unknown_fields, default)]
Expand Down Expand Up @@ -191,15 +33,18 @@ impl NodeDisplay {

pub(super) struct Monitor<'a> {
config: &'a Config,
notify: Arc<Notify>,
updates: UnboundedReceiver<EventKind>,
}

impl<'a> Monitor<'a> {
pub(super) async fn new(config: &'a Config) -> Result<Self> {
let client = CLIENT.as_ref().error("Could not get client")?;
let notify = Arc::new(Notify::new());
client.add_event_listener(&notify);
Ok(Self { config, notify })
let (tx, rx) = unbounded_channel();
client.add_event_listener(tx);
Ok(Self {
config,
updates: rx,
})
}
}

Expand Down Expand Up @@ -260,7 +105,16 @@ impl PrivacyMonitor for Monitor<'_> {
}

async fn wait_for_change(&mut self) -> Result<()> {
self.notify.notified().await;
while let Some(event) = self.updates.recv().await {
if event.intersects(
EventKind::NODE_ADDED
| EventKind::NODE_REMOVED
| EventKind::LINK_ADDED
| EventKind::LINK_REMOVED,
) {
break;
}
}
Ok(())
}
}
43 changes: 25 additions & 18 deletions src/blocks/sound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//!
//! Key | Values | Default
//! ----|--------|--------
//! `driver` | `"auto"`, `"pulseaudio"`, `"alsa"`. | `"auto"` (Pulseaudio with ALSA fallback)
//! `driver` | `"auto"`, `pipewire`, `"pulseaudio"`, `"alsa"`. | `"auto"` (Pipewire with Pulseaudio fallback with ALSA fallback)
//! `format` | A string to customise the output of this block. See below for available placeholders. | <code>\" $icon {$volume.eng(w:2) \|}\"</code>
//! `format_alt` | If set, block will switch between `format` and `format_alt` on every click. | `None`
//! `name` | PulseAudio device name, or the ALSA control name as found in the output of `amixer -D yourdevice scontrols`. | PulseAudio: `@DEFAULT_SINK@` / ALSA: `Master`
Expand Down Expand Up @@ -92,7 +92,11 @@
//! - `volume` (as a progression)
//! - `headphones`

make_log_macro!(debug, "sound");

mod alsa;
#[cfg(feature = "pipewire")]
pub mod pipewire;
#[cfg(feature = "pulseaudio")]
mod pulseaudio;

Expand All @@ -101,8 +105,6 @@ use crate::wrappers::SerdeRegex;
use indexmap::IndexMap;
use regex::Regex;

make_log_macro!(debug, "sound");

#[derive(Deserialize, Debug, SmartDefault)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
Expand Down Expand Up @@ -188,29 +190,32 @@ pub async fn run(config: &Config, api: &CommonApi) -> Result<()> {
config.device.clone().unwrap_or_else(|| "default".into()),
config.natural_mapping,
)?),
#[cfg(feature = "pipewire")]
SoundDriver::Pipewire => {
Box::new(pipewire::Device::new(config.device_kind, config.name.clone()).await?)
}
#[cfg(feature = "pulseaudio")]
SoundDriver::PulseAudio => Box::new(pulseaudio::Device::new(
config.device_kind,
config.name.clone(),
)?),
#[cfg(feature = "pulseaudio")]
SoundDriver::Auto => {
SoundDriver::Auto => 'blk: {
#[cfg(feature = "pipewire")]
if let Ok(pipewire) =
pipewire::Device::new(config.device_kind, config.name.clone()).await
{
break 'blk Box::new(pipewire);
}
#[cfg(feature = "pulseaudio")]
if let Ok(pulse) = pulseaudio::Device::new(config.device_kind, config.name.clone()) {
Box::new(pulse)
} else {
Box::new(alsa::Device::new(
config.name.clone().unwrap_or_else(|| "Master".into()),
config.device.clone().unwrap_or_else(|| "default".into()),
config.natural_mapping,
)?)
break 'blk Box::new(pulse);
}
Box::new(alsa::Device::new(
config.name.clone().unwrap_or_else(|| "Master".into()),
config.device.clone().unwrap_or_else(|| "default".into()),
config.natural_mapping,
)?)
}
#[cfg(not(feature = "pulseaudio"))]
SoundDriver::Auto => Box::new(alsa::Device::new(
config.name.clone().unwrap_or_else(|| "Master".into()),
config.device.clone().unwrap_or_else(|| "default".into()),
config.natural_mapping,
)?),
};

let mappings = match &config.mappings {
Expand Down Expand Up @@ -330,6 +335,8 @@ pub enum SoundDriver {
#[default]
Auto,
Alsa,
#[cfg(feature = "pipewire")]
Pipewire,
#[cfg(feature = "pulseaudio")]
PulseAudio,
}
Expand Down
Loading