refresh on network connection changes

This commit is contained in:
ranfdev
2024-11-21 12:52:06 +01:00
parent 5002772c65
commit e60b61a623
2 changed files with 92 additions and 75 deletions

View File

@ -1,13 +1,13 @@
macro_rules! send_command { macro_rules! send_command {
($self:expr, $command:expr) => {{ ($self:expr, $command:expr) => {{
let (resp_tx, rx) = oneshot::channel(); let (resp_tx, rx) = oneshot::channel();
use anyhow::Context;
$self $self
.command_tx .command_tx
.send($command(resp_tx)) .send($command(resp_tx))
.await .await
.map_err(|_| anyhow::anyhow!("Actor mailbox error"))?; .context("Actor mailbox error")?;
rx.await rx.await.context("Actor response error")?
.map_err(|_| anyhow::anyhow!("Actor response error"))?
}}; }};
} }

View File

@ -3,7 +3,9 @@ use crate::models::NullNetworkMonitor;
use crate::models::NullNotifier; use crate::models::NullNotifier;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use futures::future::join_all; use futures::future::join_all;
use futures::StreamExt;
use std::{collections::HashMap, future::Future, sync::Arc}; use std::{collections::HashMap, future::Future, sync::Arc};
use tokio::select;
use tokio::{ use tokio::{
sync::{broadcast, mpsc, oneshot, RwLock}, sync::{broadcast, mpsc, oneshot, RwLock},
task::{spawn_local, LocalSet}, task::{spawn_local, LocalSet},
@ -135,85 +137,89 @@ impl NtfyActor {
} }
pub async fn run(&mut self) { pub async fn run(&mut self) {
while let Some(msg) = self.command_rx.recv().await { let mut network_change_stream = self.env.network_monitor.listen();
match msg { loop {
NtfyCommand::Subscribe { select! {
server, Some(_) = network_change_stream.next() => {
topic, let _ = self.refresh_all().await;
resp_tx, },
} => { Some(command) = self.command_rx.recv() => self.handle_command(command).await,
let result = self.handle_subscribe(server, topic).await; };
let _ = resp_tx.send(result); }
} }
NtfyCommand::Unsubscribe { async fn handle_command(&mut self, command: NtfyCommand) {
server, match command {
topic, NtfyCommand::Subscribe {
resp_tx, server,
} => { topic,
let result = self.handle_unsubscribe(server, topic).await; resp_tx,
let _ = resp_tx.send(result); } => {
} let result = self.handle_subscribe(server, topic).await;
let _ = resp_tx.send(result);
}
NtfyCommand::RefreshAll { resp_tx } => { NtfyCommand::Unsubscribe {
let mut res = Ok(()); server,
for sub in self.listener_handles.read().await.values() { topic,
res = sub.restart().await; resp_tx,
if res.is_err() { } => {
break; let result = self.handle_unsubscribe(server, topic).await;
} let _ = resp_tx.send(result);
} }
let _ = resp_tx.send(res);
}
NtfyCommand::ListSubscriptions { resp_tx } => { NtfyCommand::RefreshAll { resp_tx } => {
let subs = self let res = self.refresh_all().await;
.listener_handles let _ = resp_tx.send(res);
.read() }
.await
.values()
.cloned()
.collect();
let _ = resp_tx.send(Ok(subs));
}
NtfyCommand::ListAccounts { resp_tx } => { NtfyCommand::ListSubscriptions { resp_tx } => {
let accounts = self let subs = self
.env .listener_handles
.credentials .read()
.list_all() .await
.into_iter() .values()
.map(|(server, credential)| Account { .cloned()
server, .collect();
username: credential.username, let _ = resp_tx.send(Ok(subs));
}) }
.collect();
let _ = resp_tx.send(Ok(accounts));
}
NtfyCommand::WatchSubscribed { resp_tx } => { NtfyCommand::ListAccounts { resp_tx } => {
let result = self.handle_watch_subscribed().await; let accounts = self
let _ = resp_tx.send(result); .env
} .credentials
.list_all()
.into_iter()
.map(|(server, credential)| Account {
server,
username: credential.username,
})
.collect();
let _ = resp_tx.send(Ok(accounts));
}
NtfyCommand::AddAccount { NtfyCommand::WatchSubscribed { resp_tx } => {
server, let result = self.handle_watch_subscribed().await;
username, let _ = resp_tx.send(result);
password, }
resp_tx,
} => {
let result = self
.env
.credentials
.insert(&server, &username, &password)
.await;
let _ = resp_tx.send(result);
}
NtfyCommand::RemoveAccount { server, resp_tx } => { NtfyCommand::AddAccount {
let result = self.env.credentials.delete(&server).await; server,
let _ = resp_tx.send(result); username,
} password,
resp_tx,
} => {
let result = self
.env
.credentials
.insert(&server, &username, &password)
.await;
let _ = resp_tx.send(result);
}
NtfyCommand::RemoveAccount { server, resp_tx } => {
let result = self.env.credentials.delete(&server).await;
let _ = resp_tx.send(result);
} }
} }
} }
@ -261,6 +267,17 @@ impl NtfyActor {
Ok(sub) Ok(sub)
} }
} }
async fn refresh_all(&self) -> anyhow::Result<()> {
let mut res = Ok(());
for sub in self.listener_handles.read().await.values() {
res = sub.restart().await;
if res.is_err() {
break;
}
}
res
}
} }
impl NtfyHandle { impl NtfyHandle {