From eac70af6a0de78b096ff5356edbd27799990ee09 Mon Sep 17 00:00:00 2001 From: ranfdev Date: Thu, 21 Nov 2024 16:49:15 +0100 Subject: [PATCH] add tracing to listener --- ntfy-daemon/src/actor_utils.rs | 4 +- ntfy-daemon/src/listener.rs | 238 ++++++++++++++++----------------- ntfy-daemon/src/models.rs | 2 +- 3 files changed, 120 insertions(+), 124 deletions(-) diff --git a/ntfy-daemon/src/actor_utils.rs b/ntfy-daemon/src/actor_utils.rs index cbbdce4..bafb58b 100644 --- a/ntfy-daemon/src/actor_utils.rs +++ b/ntfy-daemon/src/actor_utils.rs @@ -1,13 +1,13 @@ macro_rules! send_command { ($self:expr, $command:expr) => {{ - let (resp_tx, rx) = oneshot::channel(); + let (resp_tx, resp_rx) = oneshot::channel(); use anyhow::Context; $self .command_tx .send($command(resp_tx)) .await .context("Actor mailbox error")?; - rx.await.context("Actor response error")? + resp_rx.await.context("Actor response error")? }}; } diff --git a/ntfy-daemon/src/listener.rs b/ntfy-daemon/src/listener.rs index de3e59a..49141fa 100644 --- a/ntfy-daemon/src/listener.rs +++ b/ntfy-daemon/src/listener.rs @@ -10,7 +10,7 @@ use tokio::{ sync::{mpsc, oneshot}, }; use tokio_stream::wrappers::LinesStream; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn, Instrument, Span}; use crate::credentials::Credentials; use crate::http_client::HttpClient; @@ -106,36 +106,43 @@ pub struct ListenerActor { impl ListenerActor { pub async fn run_loop(mut self) { - let mut commands_rx = self.commands_rx.take().unwrap(); - loop { - select! { - _ = self.run_supervised_loop() => { - // the supervised loop cannot fail. If it finished, don't restart. - break; - }, - cmd = commands_rx.recv() => { - match cmd { - Some(ListenerCommand::Restart) => { - info!("Received restart command"); - continue; - } - Some(ListenerCommand::Shutdown) => { - info!("Received shutdown command"); - break; - } - Some(ListenerCommand::GetState(tx)) => { - info!("Received get state command"); - let state = self.state.clone(); - let _ = tx.send(state); - } - None => { - error!("Channel closed for ListenerActor"); - break; + let span = tracing::info_span!("listener_loop", topic = %self.config.topic); + async { + let mut commands_rx = self.commands_rx.take().unwrap(); + loop { + select! { + _ = self.run_supervised_loop() => { + info!("supervised loop ended"); + break; + }, + cmd = commands_rx.recv() => { + match cmd { + Some(ListenerCommand::Restart) => { + info!("restarting listener"); + continue; + } + Some(ListenerCommand::Shutdown) => { + info!("shutting down listener"); + break; + } + Some(ListenerCommand::GetState(tx)) => { + debug!("getting listener state"); + let state = self.state.clone(); + if tx.send(state).is_err() { + warn!("failed to send state - receiver dropped"); + } + } + None => { + error!("command channel closed"); + break; + } } } } } } + .instrument(span) + .await; } async fn set_state(&mut self, state: ConnectionState) { @@ -146,88 +153,106 @@ impl ListenerActor { .unwrap(); } async fn run_supervised_loop(&mut self) { - dbg!("supervised"); - let retrier = || { - crate::retry::WaitExponentialRandom::builder() - .min(Duration::from_secs(1)) - .max(Duration::from_secs(5 * 60)) - .build() - }; - let mut retry = retrier(); - loop { - let start_time = std::time::Instant::now(); + let span = tracing::info_span!("supervised_loop"); + async { + let retrier = || { + crate::retry::WaitExponentialRandom::builder() + .min(Duration::from_secs(1)) + .max(Duration::from_secs(5 * 60)) + .build() + }; + let mut retry = retrier(); + loop { + let start_time = std::time::Instant::now(); - if let Err(e) = self.recv_and_forward_loop().await { - let uptime = std::time::Instant::now().duration_since(start_time); - // Reset retry delay to minimum if uptime was decent enough - if uptime > Duration::from_secs(60 * 4) { - retry = retrier(); + if let Err(e) = self.recv_and_forward_loop().await { + let uptime = std::time::Instant::now().duration_since(start_time); + // Reset retry delay to minimum if uptime was decent enough + if uptime > Duration::from_secs(60 * 4) { + debug!("resetting retry delay due to sufficient uptime"); + retry = retrier(); + } + error!(error = ?e, "connection error"); + self.set_state(ConnectionState::Reconnecting { + retry_count: retry.count(), + delay: retry.next_delay(), + error: Some(Arc::new(e)), + }) + .await; + info!(delay = ?retry.next_delay(), "waiting before reconnect attempt"); + retry.wait().await; + } else { + break; } - error!(error = ?e); - self.set_state(ConnectionState::Reconnecting { - retry_count: retry.count(), - delay: retry.next_delay(), - error: Some(Arc::new(e)), - }) - .await; - info!(delay = ?retry.next_delay(), "restarting"); - retry.wait().await; - } else { - break; } } + .instrument(span) + .await; } async fn recv_and_forward_loop(&mut self) -> anyhow::Result<()> { - let creds = self.config.credentials.get(&self.config.endpoint); - let req = topic_request( - &self.config.http_client, - &self.config.endpoint, - &self.config.topic, - self.config.since, - creds.as_ref().map(|x| x.username.as_str()), - creds.as_ref().map(|x| x.password.as_str()), + let span = tracing::info_span!("receive_loop", + endpoint = %self.config.endpoint, + topic = %self.config.topic, + since = %self.config.since ); - let res = self.config.http_client.execute(req?).await?; - let res = res.error_for_status()?; - let reader = tokio_util::io::StreamReader::new( - res.bytes_stream() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())), - ); - let stream = response_lines(reader).await?; - tokio::pin!(stream); + async { + let creds = self.config.credentials.get(&self.config.endpoint); + debug!("creating request"); + let req = topic_request( + &self.config.http_client, + &self.config.endpoint, + &self.config.topic, + self.config.since, + creds.as_ref().map(|x| x.username.as_str()), + creds.as_ref().map(|x| x.password.as_str()), + ); - self.set_state(ConnectionState::Connected).await; + debug!("executing request"); + let res = self.config.http_client.execute(req?).await?; + let res = res.error_for_status()?; + let reader = tokio_util::io::StreamReader::new( + res.bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())), + ); + let stream = response_lines(reader).await?; + tokio::pin!(stream); - info!(topic = %&self.config.topic, "listening"); - while let Some(msg) = stream.next().await { - let msg = msg?; + self.set_state(ConnectionState::Connected).await; + info!("connection established"); - let min_msg = serde_json::from_str::(&msg) - .map_err(|e| Error::InvalidMinMessage(msg.to_string(), e))?; - self.config.since = min_msg.time.max(self.config.since); + info!(topic = %&self.config.topic, "listening"); + while let Some(msg) = stream.next().await { + let msg = msg?; - let event = serde_json::from_str(&msg) - .map_err(|e| Error::InvalidMessage(msg.to_string(), e))?; + let min_msg = serde_json::from_str::(&msg) + .map_err(|e| Error::InvalidMinMessage(msg.to_string(), e))?; + self.config.since = min_msg.time.max(self.config.since); - match event { - ServerEvent::Message(msg) => { - debug!("message event"); - self.event_tx - .send(ListenerEvent::Message(msg)) - .await - .unwrap(); - } - ServerEvent::KeepAlive { .. } => { - debug!("keepalive event"); - } - ServerEvent::Open { .. } => { - debug!("open event"); + let event = serde_json::from_str(&msg) + .map_err(|e| Error::InvalidMessage(msg.to_string(), e))?; + + match event { + ServerEvent::Message(msg) => { + debug!(id = %msg.id, "forwarding message"); + self.event_tx + .send(ListenerEvent::Message(msg)) + .await + .unwrap(); + } + ServerEvent::KeepAlive { id, .. } => { + debug!(id = %id, "received keepalive"); + } + ServerEvent::Open { id, .. } => { + debug!(id = %id, "received open event"); + } } } - } - Ok(()) + Ok(()) + } + .instrument(span) + .await } } @@ -323,13 +348,6 @@ mod tests { ListenerEvent::ConnectionStateChanged(ConnectionState::Connected { .. }), ] )); - - // assert!(matches!( - // listener, - // ListenerEvent::Error { .. }, - // ListenerEvent::Disconnected { .. }, - // ListenerEvent::Connected { .. }, - // )); }); local_set.await; } @@ -372,26 +390,4 @@ mod tests { }); local_set.await; } - - #[tokio::test] - async fn integration_connects_sends_receives_simple() { - let local_set = LocalSet::new(); - local_set.spawn_local(async { - let http_client = HttpClient::new(reqwest::Client::new()); - let credentials = Credentials::new_nullable(vec![]).await.unwrap(); - - let config = ListenerConfig { - http_client, - credentials, - endpoint: "http://localhost:8000".to_string(), - topic: "test".to_string(), - since: 0, - }; - - let listener = ListenerHandle::new(config.clone()); - - // assert_event_matches!(listener, ListenerEvent::Connected { .. },); - }); - local_set.await; - } } diff --git a/ntfy-daemon/src/models.rs b/ntfy-daemon/src/models.rs index e607905..1b01765 100644 --- a/ntfy-daemon/src/models.rs +++ b/ntfy-daemon/src/models.rs @@ -394,6 +394,6 @@ impl NullNetworkMonitor { impl NetworkMonitorProxy for NullNetworkMonitor { fn listen(&self) -> Pin>> { - todo!() + Box::pin(futures::stream::empty()) } }