|
1 |
| -use std::cell::Cell; |
2 |
| -use std::collections::HashMap; |
3 |
| -use std::rc::Rc; |
4 |
| -use std::sync::{Arc, Mutex, Weak}; |
5 |
| -use std::thread; |
6 |
| - |
7 |
| -use ::pipewire::{ |
8 |
| - context::Context, keys, main_loop::MainLoop, properties::properties, spa::utils::dict::DictRef, |
9 |
| - types::ObjectType, |
10 |
| -}; |
11 | 1 | use itertools::Itertools as _;
|
12 |
| -use tokio::sync::Notify; |
| 2 | +use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; |
13 | 3 |
|
14 | 4 | use super::*;
|
15 |
| - |
16 |
| -static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new); |
17 |
| - |
18 |
| -#[derive(Debug)] |
19 |
| -struct Node { |
20 |
| - name: String, |
21 |
| - nick: Option<String>, |
22 |
| - media_class: Option<String>, |
23 |
| - media_role: Option<String>, |
24 |
| - description: Option<String>, |
25 |
| -} |
26 |
| - |
27 |
| -impl Node { |
28 |
| - fn new(global_id: u32, global_props: &DictRef) -> Self { |
29 |
| - Self { |
30 |
| - name: global_props |
31 |
| - .get(&keys::NODE_NAME) |
32 |
| - .map_or_else(|| format!("node_{}", global_id), |s| s.to_string()), |
33 |
| - nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()), |
34 |
| - media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()), |
35 |
| - media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()), |
36 |
| - description: global_props |
37 |
| - .get(&keys::NODE_DESCRIPTION) |
38 |
| - .map(|s| s.to_string()), |
39 |
| - } |
40 |
| - } |
41 |
| -} |
42 |
| - |
43 |
| -#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)] |
44 |
| -struct Link { |
45 |
| - link_output_node: u32, |
46 |
| - link_input_node: u32, |
47 |
| -} |
48 |
| - |
49 |
| -impl Link { |
50 |
| - fn new(global_props: &DictRef) -> Option<Self> { |
51 |
| - if let (Some(link_output_node), Some(link_input_node)) = ( |
52 |
| - global_props |
53 |
| - .get(&keys::LINK_OUTPUT_NODE) |
54 |
| - .and_then(|s| s.parse().ok()), |
55 |
| - global_props |
56 |
| - .get(&keys::LINK_INPUT_NODE) |
57 |
| - .and_then(|s| s.parse().ok()), |
58 |
| - ) { |
59 |
| - Some(Self { |
60 |
| - link_output_node, |
61 |
| - link_input_node, |
62 |
| - }) |
63 |
| - } else { |
64 |
| - None |
65 |
| - } |
66 |
| - } |
67 |
| -} |
68 |
| - |
69 |
| -#[derive(Default)] |
70 |
| -struct Data { |
71 |
| - nodes: HashMap<u32, Node>, |
72 |
| - links: HashMap<u32, Link>, |
73 |
| -} |
74 |
| - |
75 |
| -#[derive(Default)] |
76 |
| -struct Client { |
77 |
| - event_listeners: Mutex<Vec<Weak<Notify>>>, |
78 |
| - data: Mutex<Data>, |
79 |
| -} |
80 |
| - |
81 |
| -impl Client { |
82 |
| - fn new() -> Result<Client> { |
83 |
| - thread::Builder::new() |
84 |
| - .name("privacy_pipewire".to_string()) |
85 |
| - .spawn(Client::main_loop_thread) |
86 |
| - .error("failed to spawn a thread")?; |
87 |
| - |
88 |
| - Ok(Client::default()) |
89 |
| - } |
90 |
| - |
91 |
| - fn main_loop_thread() { |
92 |
| - let client = CLIENT.as_ref().error("Could not get client").unwrap(); |
93 |
| - |
94 |
| - let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")}; |
95 |
| - |
96 |
| - let main_loop = MainLoop::new(None).expect("Failed to create main loop"); |
97 |
| - |
98 |
| - let context = |
99 |
| - Context::with_properties(&main_loop, proplist).expect("Failed to create context"); |
100 |
| - let core = context.connect(None).expect("Failed to connect"); |
101 |
| - let registry = core.get_registry().expect("Failed to get registry"); |
102 |
| - |
103 |
| - let updated = Rc::new(Cell::new(false)); |
104 |
| - let updated_copy = updated.clone(); |
105 |
| - let updated_copy2 = updated.clone(); |
106 |
| - |
107 |
| - // Register a callback to the `global` event on the registry, which notifies of any new global objects |
108 |
| - // appearing on the remote. |
109 |
| - // The callback will only get called as long as we keep the returned listener alive. |
110 |
| - let _registry_listener = registry |
111 |
| - .add_listener_local() |
112 |
| - .global(move |global| { |
113 |
| - let Some(global_props) = global.props else { |
114 |
| - return; |
115 |
| - }; |
116 |
| - match &global.type_ { |
117 |
| - ObjectType::Node => { |
118 |
| - client |
119 |
| - .data |
120 |
| - .lock() |
121 |
| - .unwrap() |
122 |
| - .nodes |
123 |
| - .insert(global.id, Node::new(global.id, global_props)); |
124 |
| - updated_copy.set(true); |
125 |
| - } |
126 |
| - ObjectType::Link => { |
127 |
| - let Some(link) = Link::new(global_props) else { |
128 |
| - return; |
129 |
| - }; |
130 |
| - client.data.lock().unwrap().links.insert(global.id, link); |
131 |
| - updated_copy.set(true); |
132 |
| - } |
133 |
| - _ => (), |
134 |
| - } |
135 |
| - }) |
136 |
| - .global_remove(move |uid| { |
137 |
| - let mut data = client.data.lock().unwrap(); |
138 |
| - if data.nodes.remove(&uid).is_some() || data.links.remove(&uid).is_some() { |
139 |
| - updated_copy2.set(true); |
140 |
| - } |
141 |
| - }) |
142 |
| - .register(); |
143 |
| - |
144 |
| - loop { |
145 |
| - main_loop.loop_().iterate(Duration::from_secs(60 * 60 * 24)); |
146 |
| - if updated.get() { |
147 |
| - updated.set(false); |
148 |
| - client |
149 |
| - .event_listeners |
150 |
| - .lock() |
151 |
| - .unwrap() |
152 |
| - .retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some()); |
153 |
| - } |
154 |
| - } |
155 |
| - } |
156 |
| - |
157 |
| - fn add_event_listener(&self, notify: &Arc<Notify>) { |
158 |
| - self.event_listeners |
159 |
| - .lock() |
160 |
| - .unwrap() |
161 |
| - .push(Arc::downgrade(notify)); |
162 |
| - } |
163 |
| -} |
| 5 | +use crate::pipewire::{CLIENT, EventKind, Link, Node}; |
164 | 6 |
|
165 | 7 | #[derive(Deserialize, Debug, SmartDefault)]
|
166 | 8 | #[serde(rename_all = "lowercase", deny_unknown_fields, default)]
|
@@ -191,15 +33,18 @@ impl NodeDisplay {
|
191 | 33 |
|
192 | 34 | pub(super) struct Monitor<'a> {
|
193 | 35 | config: &'a Config,
|
194 |
| - notify: Arc<Notify>, |
| 36 | + updates: UnboundedReceiver<EventKind>, |
195 | 37 | }
|
196 | 38 |
|
197 | 39 | impl<'a> Monitor<'a> {
|
198 | 40 | pub(super) async fn new(config: &'a Config) -> Result<Self> {
|
199 | 41 | let client = CLIENT.as_ref().error("Could not get client")?;
|
200 |
| - let notify = Arc::new(Notify::new()); |
201 |
| - client.add_event_listener(¬ify); |
202 |
| - Ok(Self { config, notify }) |
| 42 | + let (tx, rx) = unbounded_channel(); |
| 43 | + client.add_event_listener(tx); |
| 44 | + Ok(Self { |
| 45 | + config, |
| 46 | + updates: rx, |
| 47 | + }) |
203 | 48 | }
|
204 | 49 | }
|
205 | 50 |
|
@@ -260,7 +105,16 @@ impl PrivacyMonitor for Monitor<'_> {
|
260 | 105 | }
|
261 | 106 |
|
262 | 107 | async fn wait_for_change(&mut self) -> Result<()> {
|
263 |
| - self.notify.notified().await; |
| 108 | + while let Some(event) = self.updates.recv().await { |
| 109 | + if event.intersects( |
| 110 | + EventKind::NODE_ADDED |
| 111 | + | EventKind::NODE_REMOVED |
| 112 | + | EventKind::LINK_ADDED |
| 113 | + | EventKind::LINK_REMOVED, |
| 114 | + ) { |
| 115 | + break; |
| 116 | + } |
| 117 | + } |
264 | 118 | Ok(())
|
265 | 119 | }
|
266 | 120 | }
|
0 commit comments