diff --git a/ntfy-daemon/src/actor_utils.rs b/ntfy-daemon/src/actor_utils.rs index 7c00a81..cbbdce4 100644 --- a/ntfy-daemon/src/actor_utils.rs +++ b/ntfy-daemon/src/actor_utils.rs @@ -1,13 +1,13 @@ macro_rules! send_command { ($self:expr, $command:expr) => {{ let (resp_tx, rx) = oneshot::channel(); + use anyhow::Context; $self .command_tx .send($command(resp_tx)) .await - .map_err(|_| anyhow::anyhow!("Actor mailbox error"))?; - rx.await - .map_err(|_| anyhow::anyhow!("Actor response error"))? + .context("Actor mailbox error")?; + rx.await.context("Actor response error")? }}; } diff --git a/ntfy-daemon/src/ntfy.rs b/ntfy-daemon/src/ntfy.rs index a5f0e5c..7407f2f 100644 --- a/ntfy-daemon/src/ntfy.rs +++ b/ntfy-daemon/src/ntfy.rs @@ -3,7 +3,9 @@ use crate::models::NullNetworkMonitor; use crate::models::NullNotifier; use anyhow::{anyhow, Context}; use futures::future::join_all; +use futures::StreamExt; use std::{collections::HashMap, future::Future, sync::Arc}; +use tokio::select; use tokio::{ sync::{broadcast, mpsc, oneshot, RwLock}, task::{spawn_local, LocalSet}, @@ -135,85 +137,89 @@ impl NtfyActor { } pub async fn run(&mut self) { - while let Some(msg) = self.command_rx.recv().await { - match msg { - NtfyCommand::Subscribe { - server, - topic, - resp_tx, - } => { - let result = self.handle_subscribe(server, topic).await; - let _ = resp_tx.send(result); - } + let mut network_change_stream = self.env.network_monitor.listen(); + loop { + select! { + Some(_) = network_change_stream.next() => { + let _ = self.refresh_all().await; + }, + Some(command) = self.command_rx.recv() => self.handle_command(command).await, + }; + } + } - NtfyCommand::Unsubscribe { - server, - topic, - resp_tx, - } => { - let result = self.handle_unsubscribe(server, topic).await; - let _ = resp_tx.send(result); - } + async fn handle_command(&mut self, command: NtfyCommand) { + match command { + NtfyCommand::Subscribe { + server, + topic, + resp_tx, + } => { + let result = self.handle_subscribe(server, topic).await; + let _ = resp_tx.send(result); + } - NtfyCommand::RefreshAll { resp_tx } => { - let mut res = Ok(()); - for sub in self.listener_handles.read().await.values() { - res = sub.restart().await; - if res.is_err() { - break; - } - } - let _ = resp_tx.send(res); - } + NtfyCommand::Unsubscribe { + server, + topic, + resp_tx, + } => { + let result = self.handle_unsubscribe(server, topic).await; + let _ = resp_tx.send(result); + } - NtfyCommand::ListSubscriptions { resp_tx } => { - let subs = self - .listener_handles - .read() - .await - .values() - .cloned() - .collect(); - let _ = resp_tx.send(Ok(subs)); - } + NtfyCommand::RefreshAll { resp_tx } => { + let res = self.refresh_all().await; + let _ = resp_tx.send(res); + } - NtfyCommand::ListAccounts { resp_tx } => { - let accounts = self - .env - .credentials - .list_all() - .into_iter() - .map(|(server, credential)| Account { - server, - username: credential.username, - }) - .collect(); - let _ = resp_tx.send(Ok(accounts)); - } + NtfyCommand::ListSubscriptions { resp_tx } => { + let subs = self + .listener_handles + .read() + .await + .values() + .cloned() + .collect(); + let _ = resp_tx.send(Ok(subs)); + } - NtfyCommand::WatchSubscribed { resp_tx } => { - let result = self.handle_watch_subscribed().await; - let _ = resp_tx.send(result); - } + NtfyCommand::ListAccounts { resp_tx } => { + let accounts = self + .env + .credentials + .list_all() + .into_iter() + .map(|(server, credential)| Account { + server, + username: credential.username, + }) + .collect(); + let _ = resp_tx.send(Ok(accounts)); + } - NtfyCommand::AddAccount { - server, - username, - password, - resp_tx, - } => { - let result = self - .env - .credentials - .insert(&server, &username, &password) - .await; - let _ = resp_tx.send(result); - } + NtfyCommand::WatchSubscribed { resp_tx } => { + let result = self.handle_watch_subscribed().await; + let _ = resp_tx.send(result); + } - NtfyCommand::RemoveAccount { server, resp_tx } => { - let result = self.env.credentials.delete(&server).await; - let _ = resp_tx.send(result); - } + NtfyCommand::AddAccount { + server, + username, + password, + resp_tx, + } => { + let result = self + .env + .credentials + .insert(&server, &username, &password) + .await; + let _ = resp_tx.send(result); + } + + NtfyCommand::RemoveAccount { server, resp_tx } => { + let result = self.env.credentials.delete(&server).await; + let _ = resp_tx.send(result); } } } @@ -261,6 +267,17 @@ impl NtfyActor { Ok(sub) } } + + async fn refresh_all(&self) -> anyhow::Result<()> { + let mut res = Ok(()); + for sub in self.listener_handles.read().await.values() { + res = sub.restart().await; + if res.is_err() { + break; + } + } + res + } } impl NtfyHandle {