From b3d0aaf2777b0d77efb6d9e750e89f221f000be2 Mon Sep 17 00:00:00 2001 From: ranfdev Date: Thu, 21 Nov 2024 12:28:58 +0100 Subject: [PATCH] minor refactors, add OutgoingMessage --- ntfy-daemon/src/actor_utils.rs | 14 ++ ntfy-daemon/src/lib.rs | 3 + ntfy-daemon/src/listener.rs | 20 +-- ntfy-daemon/src/models.rs | 41 +++++- ntfy-daemon/src/ntfy.rs | 179 +++++++++---------------- ntfy-daemon/src/subscription.rs | 107 ++++++++------- src/subscription.rs | 6 +- src/widgets/add_subscription_dialog.rs | 4 +- src/widgets/message_row.rs | 4 +- src/widgets/window.rs | 14 +- 10 files changed, 184 insertions(+), 208 deletions(-) create mode 100644 ntfy-daemon/src/actor_utils.rs diff --git a/ntfy-daemon/src/actor_utils.rs b/ntfy-daemon/src/actor_utils.rs new file mode 100644 index 0000000..7c00a81 --- /dev/null +++ b/ntfy-daemon/src/actor_utils.rs @@ -0,0 +1,14 @@ +macro_rules! send_command { + ($self:expr, $command:expr) => {{ + let (resp_tx, rx) = oneshot::channel(); + $self + .command_tx + .send($command(resp_tx)) + .await + .map_err(|_| anyhow::anyhow!("Actor mailbox error"))?; + rx.await + .map_err(|_| anyhow::anyhow!("Actor response error"))? + }}; +} + +pub(crate) use send_command; diff --git a/ntfy-daemon/src/lib.rs b/ntfy-daemon/src/lib.rs index e1e834e..f5cf0d7 100644 --- a/ntfy-daemon/src/lib.rs +++ b/ntfy-daemon/src/lib.rs @@ -1,3 +1,4 @@ +mod actor_utils; pub mod credentials; mod http_client; mod listener; @@ -31,6 +32,8 @@ pub enum Error { InvalidTopic(String), #[error("invalid server base url {0:?}")] InvalidServer(#[from] url::ParseError), + #[error("multiple errors in subscription model: {0:?}")] + InvalidSubscription(Vec), #[error("duplicate message")] DuplicateMessage, #[error("can't parse the minimum set of required fields from the message {0}")] diff --git a/ntfy-daemon/src/listener.rs b/ntfy-daemon/src/listener.rs index 68a37e6..30a437d 100644 --- a/ntfy-daemon/src/listener.rs +++ b/ntfy-daemon/src/listener.rs @@ -36,7 +36,7 @@ pub enum ServerEvent { topic: String, }, #[serde(rename = "message")] - Message(models::Message), + Message(models::ReceivedMessage), #[serde(rename = "keepalive")] KeepAlive { id: String, @@ -48,7 +48,7 @@ pub enum ServerEvent { #[derive(Debug, Clone)] pub enum ListenerEvent { - Message(models::Message), + Message(models::ReceivedMessage), ConnectionStateChanged(ConnectionState), } @@ -281,7 +281,7 @@ impl ListenerHandle { } // the response will be sent as an event in self.events - pub async fn request_state(&self) -> ConnectionState { + pub async fn state(&self) -> ConnectionState { let (tx, rx) = oneshot::channel(); self.commands .send(ListenerCommand::GetState(tx)) @@ -300,20 +300,6 @@ mod tests { use super::*; - // takes a list of pattern matches. It recvs events and then matches them - // against the macro parameters - macro_rules! assert_event_matches { - ($listener:expr, $( $pattern:pat_param ),+ $(,)?) => { - $( - $listener.events.changed().await.unwrap(); - let event = $listener.events.borrow().clone(); - - panic!("{:?}", &event); - assert!(matches!(event, $pattern)); - )+ - }; - } - #[tokio::test] async fn test_listener_reconnects_on_http_status_500() { let local_set = LocalSet::new(); diff --git a/ntfy-daemon/src/models.rs b/ntfy-daemon/src/models.rs index 586f717..e607905 100644 --- a/ntfy-daemon/src/models.rs +++ b/ntfy-daemon/src/models.rs @@ -27,7 +27,7 @@ pub fn validate_topic(topic: &str) -> Result<&str, Error> { } #[derive(Default, Clone, Debug, Serialize, Deserialize)] -pub struct Message { +pub struct ReceivedMessage { pub id: String, pub topic: String, pub expires: Option, @@ -59,7 +59,7 @@ pub struct Message { pub actions: Vec, } -impl Message { +impl ReceivedMessage { fn extend_with_emojis(&self, text: &mut String) { // Add emojis for t in &self.tags { @@ -107,6 +107,37 @@ impl Message { } } +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +pub struct OutgoingMessage { + pub topic: String, + pub message: Option, + #[serde(default = "Default::default")] + pub time: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub title: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub tags: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub priority: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub attachment: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub icon: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub filename: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub delay: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub email: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub call: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Vec::is_empty")] + pub actions: Vec, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MinMessage { pub id: String, @@ -167,7 +198,7 @@ impl Subscription { .push("auth"); Ok(url) } - pub fn validate(self) -> Result> { + pub fn validate(self) -> Result { let mut errs = vec![]; if let Err(e) = validate_topic(&self.topic) { errs.push(e); @@ -176,7 +207,7 @@ impl Subscription { errs.push(e); }; if !errs.is_empty() { - return Err(errs); + return Err(Error::InvalidSubscription(errs)); } Ok(self) } @@ -239,7 +270,7 @@ impl SubscriptionBuilder { self } - pub fn build(self) -> Result> { + pub fn build(self) -> Result { let res = Subscription { server: self.server, topic: self.topic, diff --git a/ntfy-daemon/src/ntfy.rs b/ntfy-daemon/src/ntfy.rs index e2f9307..a5f0e5c 100644 --- a/ntfy-daemon/src/ntfy.rs +++ b/ntfy-daemon/src/ntfy.rs @@ -1,3 +1,4 @@ +use crate::actor_utils::send_command; use crate::models::NullNetworkMonitor; use crate::models::NullNotifier; use anyhow::{anyhow, Context}; @@ -36,40 +37,39 @@ pub fn build_client() -> anyhow::Result { // Message types for the actor #[derive()] -pub enum NtfyMessage { +pub enum NtfyCommand { Subscribe { server: String, topic: String, - respond_to: oneshot::Sender>>, + resp_tx: oneshot::Sender>, }, Unsubscribe { server: String, topic: String, - respond_to: oneshot::Sender>, + resp_tx: oneshot::Sender>, }, RefreshAll { - respond_to: oneshot::Sender>, + resp_tx: oneshot::Sender>, }, ListSubscriptions { - respond_to: oneshot::Sender>>, + resp_tx: oneshot::Sender>>, }, ListAccounts { - respond_to: oneshot::Sender>>, + resp_tx: oneshot::Sender>>, }, WatchSubscribed { - respond_to: oneshot::Sender>, + resp_tx: oneshot::Sender>, }, AddAccount { server: String, username: String, password: String, - respond_to: oneshot::Sender>, + resp_tx: oneshot::Sender>, }, RemoveAccount { server: String, - respond_to: oneshot::Sender>, + resp_tx: oneshot::Sender>, }, - Shutdown, } #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -81,12 +81,12 @@ pub struct WatchKey { pub struct NtfyActor { listener_handles: Arc>>, env: SharedEnv, - command_rx: mpsc::Receiver, + command_rx: mpsc::Receiver, } #[derive(Clone)] pub struct NtfyHandle { - command_tx: mpsc::Sender, + command_tx: mpsc::Sender, } impl NtfyActor { @@ -108,19 +108,15 @@ impl NtfyActor { &self, server: String, topic: String, - ) -> Result> { + ) -> Result { let subscription = models::Subscription::builder(topic.clone()) .server(server.clone()) - .build() - .map_err(|e| e.into_iter().map(|e| anyhow!(e)).collect::>())?; + .build()?; let mut db = self.env.db.clone(); - db.insert_subscription(subscription.clone()) - .map_err(|e| vec![anyhow!(e)])?; + db.insert_subscription(subscription.clone())?; - self.listen(subscription) - .await - .map_err(|e| vec![anyhow!(e)]) + self.listen(subscription).await } async fn handle_unsubscribe(&mut self, server: String, topic: String) -> anyhow::Result<()> { @@ -141,25 +137,25 @@ impl NtfyActor { pub async fn run(&mut self) { while let Some(msg) = self.command_rx.recv().await { match msg { - NtfyMessage::Subscribe { + NtfyCommand::Subscribe { server, topic, - respond_to, + resp_tx, } => { let result = self.handle_subscribe(server, topic).await; - let _ = respond_to.send(result); + let _ = resp_tx.send(result); } - NtfyMessage::Unsubscribe { + NtfyCommand::Unsubscribe { server, topic, - respond_to, + resp_tx, } => { let result = self.handle_unsubscribe(server, topic).await; - let _ = respond_to.send(result); + let _ = resp_tx.send(result); } - NtfyMessage::RefreshAll { respond_to } => { + NtfyCommand::RefreshAll { resp_tx } => { let mut res = Ok(()); for sub in self.listener_handles.read().await.values() { res = sub.restart().await; @@ -167,10 +163,10 @@ impl NtfyActor { break; } } - let _ = respond_to.send(res); + let _ = resp_tx.send(res); } - NtfyMessage::ListSubscriptions { respond_to } => { + NtfyCommand::ListSubscriptions { resp_tx } => { let subs = self .listener_handles .read() @@ -178,10 +174,10 @@ impl NtfyActor { .values() .cloned() .collect(); - let _ = respond_to.send(Ok(subs)); + let _ = resp_tx.send(Ok(subs)); } - NtfyMessage::ListAccounts { respond_to } => { + NtfyCommand::ListAccounts { resp_tx } => { let accounts = self .env .credentials @@ -192,34 +188,32 @@ impl NtfyActor { username: credential.username, }) .collect(); - let _ = respond_to.send(Ok(accounts)); + let _ = resp_tx.send(Ok(accounts)); } - NtfyMessage::WatchSubscribed { respond_to } => { + NtfyCommand::WatchSubscribed { resp_tx } => { let result = self.handle_watch_subscribed().await; - let _ = respond_to.send(result); + let _ = resp_tx.send(result); } - NtfyMessage::AddAccount { + NtfyCommand::AddAccount { server, username, password, - respond_to, + resp_tx, } => { let result = self .env .credentials .insert(&server, &username, &password) .await; - let _ = respond_to.send(result); + let _ = resp_tx.send(result); } - NtfyMessage::RemoveAccount { server, respond_to } => { + NtfyCommand::RemoveAccount { server, resp_tx } => { let result = self.env.credentials.delete(&server).await; - let _ = respond_to.send(result); + let _ = resp_tx.send(result); } - - NtfyMessage::Shutdown => break, } } } @@ -274,73 +268,36 @@ impl NtfyHandle { &self, server: &str, topic: &str, - ) -> Result> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::Subscribe { - server: server.to_string(), - topic: topic.to_string(), - respond_to: tx, - }) - .await - .map_err(|_| vec![anyhow!("Actor mailbox error")])?; - - rx.await - .map_err(|_| vec![anyhow!("Actor response error")])? + ) -> Result { + send_command!(self, |resp_tx| NtfyCommand::Subscribe { + server: server.to_string(), + topic: topic.to_string(), + resp_tx, + }) } pub async fn unsubscribe(&self, server: &str, topic: &str) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::Unsubscribe { - server: server.to_string(), - topic: topic.to_string(), - respond_to: tx, - }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::Unsubscribe { + server: server.to_string(), + topic: topic.to_string(), + resp_tx, + }) } pub async fn refresh_all(&self) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::RefreshAll { respond_to: tx }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::RefreshAll { resp_tx }) } pub async fn list_subscriptions(&self) -> anyhow::Result> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::ListSubscriptions { respond_to: tx }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::ListSubscriptions { resp_tx }) } pub async fn list_accounts(&self) -> anyhow::Result> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::ListAccounts { respond_to: tx }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::ListAccounts { resp_tx }) } pub async fn watch_subscribed(&self) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::WatchSubscribed { respond_to: tx }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::WatchSubscribed { resp_tx }) } pub async fn add_account( @@ -349,31 +306,19 @@ impl NtfyHandle { username: &str, password: &str, ) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::AddAccount { - server: server.to_string(), - username: username.to_string(), - password: password.to_string(), - respond_to: tx, - }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::AddAccount { + server: server.to_string(), + username: username.to_string(), + password: password.to_string(), + resp_tx, + }) } pub async fn remove_account(&self, server: &str) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); - self.command_tx - .send(NtfyMessage::RemoveAccount { - server: server.to_string(), - respond_to: tx, - }) - .await - .map_err(|_| anyhow!("Actor mailbox error"))?; - - rx.await.map_err(|_| anyhow!("Actor response error"))? + send_command!(self, |resp_tx| NtfyCommand::RemoveAccount { + server: server.to_string(), + resp_tx, + }) } } @@ -438,7 +383,7 @@ pub fn start( mod tests { use std::time::Duration; - use models::Message; + use models::{OutgoingMessage, ReceivedMessage}; use tokio::time::sleep; use crate::ListenerEvent; @@ -466,7 +411,7 @@ mod tests { let subscription_handle = handle.subscribe(server, topic).await.unwrap(); // Publish a message - let message = serde_json::to_string(&Message { + let message = serde_json::to_string(&OutgoingMessage { topic: topic.to_string(), ..Default::default() }) diff --git a/ntfy-daemon/src/subscription.rs b/ntfy-daemon/src/subscription.rs index 286e0d3..e26280d 100644 --- a/ntfy-daemon/src/subscription.rs +++ b/ntfy-daemon/src/subscription.rs @@ -1,6 +1,6 @@ use crate::listener::{ListenerEvent, ListenerHandle}; use crate::message_repo::Db; -use crate::models::{self, Message, NotificationProxy}; +use crate::models::{self, NotificationProxy, ReceivedMessage}; use crate::{Error, ServerEvent, SharedEnv}; use std::future::Future; use std::sync::Arc; @@ -9,31 +9,58 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch, RwLock}; use tokio::task::spawn_local; use tracing::{error, info, warn}; +enum SubscriptionCommand { + GetModel { + resp_tx: oneshot::Sender, + }, + UpdateInfo { + new_model: models::Subscription, + resp_tx: oneshot::Sender>, + }, + Attach { + resp_tx: oneshot::Sender<(Vec, broadcast::Receiver)>, + }, + Publish { + msg: String, + resp_tx: oneshot::Sender>, + }, + ClearNotifications { + resp_tx: oneshot::Sender>, + }, + UpdateReadUntil { + timestamp: u64, + resp_tx: oneshot::Sender>, + }, +} + #[derive(Clone)] pub struct SubscriptionHandle { - sender: mpsc::Sender, + command_tx: mpsc::Sender, listener: ListenerHandle, } impl SubscriptionHandle { pub fn new(listener: ListenerHandle, model: models::Subscription, env: &SharedEnv) -> Self { - let (sender, receiver) = mpsc::channel(32); + let (command_tx, command_rx) = mpsc::channel(32); let broadcast_tx = broadcast::channel(8).0; let actor = SubscriptionActor { listener: listener.clone(), model, - receiver, + command_rx, env: env.clone(), broadcast_tx: broadcast_tx.clone(), }; spawn_local(actor.run()); - Self { sender, listener } + Self { + command_tx, + listener, + } } pub async fn model(&self) -> models::Subscription { let (resp_tx, resp_rx) = oneshot::channel(); - self.sender - .send(SubscriptionRequest::GetModel { resp_tx }) + self.command_tx + .send(SubscriptionCommand::GetModel { resp_tx }) .await .unwrap(); resp_rx.await.unwrap() @@ -41,8 +68,8 @@ impl SubscriptionHandle { pub async fn update_info(&self, new_model: models::Subscription) -> anyhow::Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); - self.sender - .send(SubscriptionRequest::UpdateInfo { new_model, resp_tx }) + self.command_tx + .send(SubscriptionCommand::UpdateInfo { new_model, resp_tx }) .await?; resp_rx.await.unwrap() } @@ -68,8 +95,8 @@ impl SubscriptionHandle { // The `ListenerHandle` is returned to receive new events. pub async fn attach(&self) -> (Vec, broadcast::Receiver) { let (resp_tx, resp_rx) = oneshot::channel(); - self.sender - .send(SubscriptionRequest::Attach { resp_tx }) + self.command_tx + .send(SubscriptionCommand::Attach { resp_tx }) .await .unwrap(); resp_rx.await.unwrap() @@ -77,8 +104,8 @@ impl SubscriptionHandle { pub async fn publish(&self, msg: String) -> anyhow::Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); - self.sender - .send(SubscriptionRequest::Publish { msg, resp_tx }) + self.command_tx + .send(SubscriptionCommand::Publish { msg, resp_tx }) .await .unwrap(); resp_rx.await.unwrap() @@ -86,8 +113,8 @@ impl SubscriptionHandle { pub async fn clear_notifications(&self) -> anyhow::Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); - self.sender - .send(SubscriptionRequest::ClearNotifications { resp_tx }) + self.command_tx + .send(SubscriptionCommand::ClearNotifications { resp_tx }) .await .unwrap(); resp_rx.await.unwrap() @@ -95,8 +122,8 @@ impl SubscriptionHandle { pub async fn update_read_until(&self, timestamp: u64) -> anyhow::Result<()> { let (resp_tx, resp_rx) = oneshot::channel(); - self.sender - .send(SubscriptionRequest::UpdateReadUntil { timestamp, resp_tx }) + self.command_tx + .send(SubscriptionCommand::UpdateReadUntil { timestamp, resp_tx }) .await .unwrap(); resp_rx.await.unwrap() @@ -106,7 +133,7 @@ impl SubscriptionHandle { struct SubscriptionActor { listener: ListenerHandle, model: models::Subscription, - receiver: mpsc::Receiver, + command_rx: mpsc::Receiver, env: SharedEnv, broadcast_tx: broadcast::Sender, } @@ -123,12 +150,12 @@ impl SubscriptionActor { } } } - Some(request) = self.receiver.recv() => { - match request { - SubscriptionRequest::GetModel { resp_tx } => { + Some(command) = self.command_rx.recv() => { + match command { + SubscriptionCommand::GetModel { resp_tx } => { let _ = resp_tx.send(self.model.clone()); } - SubscriptionRequest::UpdateInfo { + SubscriptionCommand::UpdateInfo { mut new_model, resp_tx, } => { @@ -140,10 +167,10 @@ impl SubscriptionActor { } resp_tx.send(res.map_err(|e| e.into())); } - SubscriptionRequest::Publish {msg, resp_tx} => { + SubscriptionCommand::Publish {msg, resp_tx} => { let _ = resp_tx.send(self.publish(msg).await); } - SubscriptionRequest::Attach { resp_tx } => { + SubscriptionCommand::Attach { resp_tx } => { let messages = self .env .db @@ -163,13 +190,13 @@ impl SubscriptionActor { }) .map(ListenerEvent::Message) .collect(); - previous_events.push(ListenerEvent::ConnectionStateChanged(self.listener.request_state().await)); + previous_events.push(ListenerEvent::ConnectionStateChanged(self.listener.state().await)); let _ = resp_tx.send((previous_events, self.broadcast_tx.subscribe())); } - SubscriptionRequest::ClearNotifications {resp_tx} => { + SubscriptionCommand::ClearNotifications {resp_tx} => { let _ = resp_tx.send(self.env.db.delete_messages(&self.model.server, &self.model.topic).map_err(|e| anyhow::anyhow!(e))); } - SubscriptionRequest::UpdateReadUntil { timestamp, resp_tx } => { + SubscriptionCommand::UpdateReadUntil { timestamp, resp_tx } => { let res = self.env.db.update_read_until(&self.model.server, &self.model.topic, timestamp); let _ = resp_tx.send(res.map_err(|e| anyhow::anyhow!(e))); } @@ -192,7 +219,7 @@ impl SubscriptionActor { res.error_for_status()?; Ok(()) } - fn handle_msg_event(&mut self, msg: Message) { + fn handle_msg_event(&mut self, msg: ReceivedMessage) { // Store in database let already_stored: bool = { let json_ev = &serde_json::to_string(&msg).unwrap(); @@ -231,27 +258,3 @@ impl SubscriptionActor { } } } - -enum SubscriptionRequest { - GetModel { - resp_tx: oneshot::Sender, - }, - UpdateInfo { - new_model: models::Subscription, - resp_tx: oneshot::Sender>, - }, - Attach { - resp_tx: oneshot::Sender<(Vec, broadcast::Receiver)>, - }, - Publish { - msg: String, - resp_tx: oneshot::Sender>, - }, - ClearNotifications { - resp_tx: oneshot::Sender>, - }, - UpdateReadUntil { - timestamp: u64, - resp_tx: oneshot::Sender>, - }, -} diff --git a/src/subscription.rs b/src/subscription.rs index 871dcd8..c0aa171 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -227,12 +227,12 @@ impl Subscription { .await?; Ok(()) } - fn last_message(list: &gio::ListStore) -> Option { + fn last_message(list: &gio::ListStore) -> Option { let n = list.n_items(); let last = list .item(n.checked_sub(1)?) .and_downcast::()?; - let last = last.borrow::(); + let last = last.borrow::(); Some(last.clone()) } fn update_unread_count(&self) { @@ -275,7 +275,7 @@ impl Subscription { Ok(()) } - pub async fn publish_msg(&self, mut msg: models::Message) -> anyhow::Result<()> { + pub async fn publish_msg(&self, mut msg: models::OutgoingMessage) -> anyhow::Result<()> { let imp = self.imp(); let json = { msg.topic = self.topic(); diff --git a/src/widgets/add_subscription_dialog.rs b/src/widgets/add_subscription_dialog.rs index 004b176..a29a3b2 100644 --- a/src/widgets/add_subscription_dialog.rs +++ b/src/widgets/add_subscription_dialog.rs @@ -166,7 +166,7 @@ impl AddSubscriptionDialog { obj.set_content_width(480); obj.set_child(Some(&toolbar_view)); } - pub fn subscription(&self) -> Result> { + pub fn subscription(&self) -> Result { let w = { self.imp().widgets.borrow().clone() }; let mut sub = models::Subscription::builder(w.topic_entry.text().to_string()); if w.server_expander.enables_expansion() { @@ -183,7 +183,7 @@ impl AddSubscriptionDialog { w.topic_entry.remove_css_class("error"); w.sub_btn.set_sensitive(true); - if let Err(errs) = sub { + if let Err(ntfy_daemon::Error::InvalidSubscription(errs)) = sub { w.sub_btn.set_sensitive(false); for e in errs { match e { diff --git a/src/widgets/message_row.rs b/src/widgets/message_row.rs index 8f30606..e2c375f 100644 --- a/src/widgets/message_row.rs +++ b/src/widgets/message_row.rs @@ -34,12 +34,12 @@ glib::wrapper! { } impl MessageRow { - pub fn new(msg: models::Message) -> Self { + pub fn new(msg: models::ReceivedMessage) -> Self { let this: Self = glib::Object::new(); this.build_ui(msg); this } - fn build_ui(&self, msg: models::Message) { + fn build_ui(&self, msg: models::ReceivedMessage) { self.set_margin_top(8); self.set_margin_bottom(8); self.set_margin_start(8); diff --git a/src/widgets/window.rs b/src/widgets/window.rs index 21eaedb..5db0a46 100644 --- a/src/widgets/window.rs +++ b/src/widgets/window.rs @@ -226,9 +226,9 @@ impl NotifyWindow { entry.error_boundary().spawn(async move { this.selected_subscription() .unwrap() - .publish_msg(models::Message { + .publish_msg(models::OutgoingMessage { message: Some(entry.text().as_str().to_string()), - ..models::Message::default() + ..models::OutgoingMessage::default() }) .await?; Ok(()) @@ -266,13 +266,7 @@ impl NotifyWindow { fn add_subscription(&self, sub: models::Subscription) { let this = self.clone(); self.error_boundary().spawn(async move { - let sub = this - .notifier() - .subscribe(&sub.server, &sub.topic) - .await - .map_err(|err| { - anyhow::anyhow!(err.into_iter().map(|x| x.to_string()).collect::()) - })?; + let sub = this.notifier().subscribe(&sub.server, &sub.topic).await?; let imp = this.imp(); // Subscription::new will use the pipelined client to retrieve info about the subscription @@ -371,7 +365,7 @@ impl NotifyWindow { imp.message_list .bind_model(Some(&sub.imp().messages), move |obj| { let b = obj.downcast_ref::().unwrap(); - let msg = b.borrow::(); + let msg = b.borrow::(); MessageRow::new(msg.clone()).upcast() });