add proper tracing to subscription
This commit is contained in:
@ -4,8 +4,9 @@ use crate::{Error, SharedEnv};
|
|||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
use tokio::task::spawn_local;
|
use tokio::task::spawn_local;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
enum SubscriptionCommand {
|
enum SubscriptionCommand {
|
||||||
GetModel {
|
GetModel {
|
||||||
resp_tx: oneshot::Sender<models::Subscription>,
|
resp_tx: oneshot::Sender<models::Subscription>,
|
||||||
@ -140,6 +141,7 @@ impl SubscriptionActor {
|
|||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
Ok(event) = self.listener.events.recv() => {
|
Ok(event) = self.listener.events.recv() => {
|
||||||
|
debug!(?event, "received listener event");
|
||||||
match event {
|
match event {
|
||||||
ListenerEvent::Message(msg) => self.handle_msg_event(msg),
|
ListenerEvent::Message(msg) => self.handle_msg_event(msg),
|
||||||
other => {
|
other => {
|
||||||
@ -148,14 +150,17 @@ impl SubscriptionActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(command) = self.command_rx.recv() => {
|
Some(command) = self.command_rx.recv() => {
|
||||||
|
trace!(?command, "processing subscription command");
|
||||||
match command {
|
match command {
|
||||||
SubscriptionCommand::GetModel { resp_tx } => {
|
SubscriptionCommand::GetModel { resp_tx } => {
|
||||||
|
debug!("getting subscription model");
|
||||||
let _ = resp_tx.send(self.model.clone());
|
let _ = resp_tx.send(self.model.clone());
|
||||||
}
|
}
|
||||||
SubscriptionCommand::UpdateInfo {
|
SubscriptionCommand::UpdateInfo {
|
||||||
mut new_model,
|
mut new_model,
|
||||||
resp_tx,
|
resp_tx,
|
||||||
} => {
|
} => {
|
||||||
|
debug!(server=?new_model.server, topic=?new_model.topic, "updating subscription info");
|
||||||
new_model.server = self.model.server.clone();
|
new_model.server = self.model.server.clone();
|
||||||
new_model.topic = self.model.topic.clone();
|
new_model.topic = self.model.topic.clone();
|
||||||
let res = self.env.db.update_subscription(new_model.clone());
|
let res = self.env.db.update_subscription(new_model.clone());
|
||||||
@ -165,9 +170,11 @@ impl SubscriptionActor {
|
|||||||
let _ = resp_tx.send(res.map_err(|e| e.into()));
|
let _ = resp_tx.send(res.map_err(|e| e.into()));
|
||||||
}
|
}
|
||||||
SubscriptionCommand::Publish {msg, resp_tx} => {
|
SubscriptionCommand::Publish {msg, resp_tx} => {
|
||||||
|
debug!(topic=?self.model.topic, "publishing message");
|
||||||
let _ = resp_tx.send(self.publish(msg).await);
|
let _ = resp_tx.send(self.publish(msg).await);
|
||||||
}
|
}
|
||||||
SubscriptionCommand::Attach { resp_tx } => {
|
SubscriptionCommand::Attach { resp_tx } => {
|
||||||
|
debug!(topic=?self.model.topic, "attaching new listener");
|
||||||
let messages = self
|
let messages = self
|
||||||
.env
|
.env
|
||||||
.db
|
.db
|
||||||
@ -191,9 +198,11 @@ impl SubscriptionActor {
|
|||||||
let _ = resp_tx.send((previous_events, self.broadcast_tx.subscribe()));
|
let _ = resp_tx.send((previous_events, self.broadcast_tx.subscribe()));
|
||||||
}
|
}
|
||||||
SubscriptionCommand::ClearNotifications {resp_tx} => {
|
SubscriptionCommand::ClearNotifications {resp_tx} => {
|
||||||
|
debug!(topic=?self.model.topic, "clearing notifications");
|
||||||
let _ = resp_tx.send(self.env.db.delete_messages(&self.model.server, &self.model.topic).map_err(|e| anyhow::anyhow!(e)));
|
let _ = resp_tx.send(self.env.db.delete_messages(&self.model.server, &self.model.topic).map_err(|e| anyhow::anyhow!(e)));
|
||||||
}
|
}
|
||||||
SubscriptionCommand::UpdateReadUntil { timestamp, resp_tx } => {
|
SubscriptionCommand::UpdateReadUntil { timestamp, resp_tx } => {
|
||||||
|
debug!(topic=?self.model.topic, timestamp=timestamp, "updating read until timestamp");
|
||||||
let res = self.env.db.update_read_until(&self.model.server, &self.model.topic, timestamp);
|
let res = self.env.db.update_read_until(&self.model.server, &self.model.topic, timestamp);
|
||||||
let _ = resp_tx.send(res.map_err(|e| anyhow::anyhow!(e)));
|
let _ = resp_tx.send(res.map_err(|e| anyhow::anyhow!(e)));
|
||||||
}
|
}
|
||||||
@ -205,35 +214,42 @@ impl SubscriptionActor {
|
|||||||
|
|
||||||
async fn publish(&self, msg: String) -> anyhow::Result<()> {
|
async fn publish(&self, msg: String) -> anyhow::Result<()> {
|
||||||
let server = &self.model.server;
|
let server = &self.model.server;
|
||||||
|
debug!(server=?server, "preparing to publish message");
|
||||||
let creds = self.env.credentials.get(server);
|
let creds = self.env.credentials.get(server);
|
||||||
let mut req = self.env.http_client.post(server);
|
let mut req = self.env.http_client.post(server);
|
||||||
if let Some(creds) = creds {
|
if let Some(creds) = creds {
|
||||||
req = req.basic_auth(creds.username, Some(creds.password));
|
req = req.basic_auth(creds.username, Some(creds.password));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("sending message");
|
info!(server=?server, "sending message");
|
||||||
let res = req.body(msg).send().await?;
|
let res = req.body(msg).send().await?;
|
||||||
res.error_for_status()?;
|
res.error_for_status()?;
|
||||||
|
debug!(server=?server, "message published successfully");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn handle_msg_event(&mut self, msg: ReceivedMessage) {
|
fn handle_msg_event(&mut self, msg: ReceivedMessage) {
|
||||||
|
debug!(topic=?self.model.topic, "handling new message");
|
||||||
// Store in database
|
// Store in database
|
||||||
let already_stored: bool = {
|
let already_stored: bool = {
|
||||||
let json_ev = &serde_json::to_string(&msg).unwrap();
|
let json_ev = &serde_json::to_string(&msg).unwrap();
|
||||||
match self.env.db.insert_message(&self.model.server, json_ev) {
|
match self.env.db.insert_message(&self.model.server, json_ev) {
|
||||||
Err(Error::DuplicateMessage) => {
|
Err(Error::DuplicateMessage) => {
|
||||||
warn!("Received duplicate message");
|
warn!(topic=?self.model.topic, "received duplicate message");
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(error = ?e, "Can't store the message");
|
error!(error=?e, topic=?self.model.topic, "can't store the message");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
debug!(topic=?self.model.topic, "message stored successfully");
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
_ => false,
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if !already_stored {
|
if !already_stored {
|
||||||
|
debug!(topic=?self.model.topic, muted=?self.model.muted, "checking if notification should be shown");
|
||||||
// Show notification. If this fails, panic
|
// Show notification. If this fails, panic
|
||||||
if !{ self.model.muted } {
|
if !{ self.model.muted } {
|
||||||
let notifier = self.env.notifier.clone();
|
let notifier = self.env.notifier.clone();
|
||||||
@ -246,11 +262,14 @@ impl SubscriptionActor {
|
|||||||
actions: msg.actions.clone(),
|
actions: msg.actions.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Showing notification");
|
info!(topic=?self.model.topic, "showing notification");
|
||||||
notifier.send(n).unwrap();
|
notifier.send(n).unwrap();
|
||||||
|
} else {
|
||||||
|
debug!(topic=?self.model.topic, "notification muted, skipping");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forward to app
|
// Forward to app
|
||||||
|
debug!(topic=?self.model.topic, "forwarding message to app");
|
||||||
let _ = self.broadcast_tx.send(ListenerEvent::Message(msg));
|
let _ = self.broadcast_tx.send(ListenerEvent::Message(msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -165,7 +165,7 @@ impl Subscription {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn handle_event(&self, ev: ListenerEvent) {
|
fn handle_event(&self, ev: ListenerEvent) {
|
||||||
match dbg!(ev) {
|
match ev {
|
||||||
ListenerEvent::Message(msg) => {
|
ListenerEvent::Message(msg) => {
|
||||||
self.imp().messages.append(&glib::BoxedAnyObject::new(msg));
|
self.imp().messages.append(&glib::BoxedAnyObject::new(msg));
|
||||||
self.update_unread_count();
|
self.update_unread_count();
|
||||||
|
|||||||
Reference in New Issue
Block a user