add tracing to listener

This commit is contained in:
ranfdev
2024-11-21 16:49:15 +01:00
parent dd5c334171
commit eac70af6a0
3 changed files with 120 additions and 124 deletions

View File

@ -1,13 +1,13 @@
macro_rules! send_command { macro_rules! send_command {
($self:expr, $command:expr) => {{ ($self:expr, $command:expr) => {{
let (resp_tx, rx) = oneshot::channel(); let (resp_tx, resp_rx) = oneshot::channel();
use anyhow::Context; use anyhow::Context;
$self $self
.command_tx .command_tx
.send($command(resp_tx)) .send($command(resp_tx))
.await .await
.context("Actor mailbox error")?; .context("Actor mailbox error")?;
rx.await.context("Actor response error")? resp_rx.await.context("Actor response error")?
}}; }};
} }

View File

@ -10,7 +10,7 @@ use tokio::{
sync::{mpsc, oneshot}, sync::{mpsc, oneshot},
}; };
use tokio_stream::wrappers::LinesStream; use tokio_stream::wrappers::LinesStream;
use tracing::{debug, error, info}; use tracing::{debug, error, info, warn, Instrument, Span};
use crate::credentials::Credentials; use crate::credentials::Credentials;
use crate::http_client::HttpClient; use crate::http_client::HttpClient;
@ -106,36 +106,43 @@ pub struct ListenerActor {
impl ListenerActor { impl ListenerActor {
pub async fn run_loop(mut self) { pub async fn run_loop(mut self) {
let mut commands_rx = self.commands_rx.take().unwrap(); let span = tracing::info_span!("listener_loop", topic = %self.config.topic);
loop { async {
select! { let mut commands_rx = self.commands_rx.take().unwrap();
_ = self.run_supervised_loop() => { loop {
// the supervised loop cannot fail. If it finished, don't restart. select! {
break; _ = self.run_supervised_loop() => {
}, info!("supervised loop ended");
cmd = commands_rx.recv() => { break;
match cmd { },
Some(ListenerCommand::Restart) => { cmd = commands_rx.recv() => {
info!("Received restart command"); match cmd {
continue; Some(ListenerCommand::Restart) => {
} info!("restarting listener");
Some(ListenerCommand::Shutdown) => { continue;
info!("Received shutdown command"); }
break; Some(ListenerCommand::Shutdown) => {
} info!("shutting down listener");
Some(ListenerCommand::GetState(tx)) => { break;
info!("Received get state command"); }
let state = self.state.clone(); Some(ListenerCommand::GetState(tx)) => {
let _ = tx.send(state); debug!("getting listener state");
} let state = self.state.clone();
None => { if tx.send(state).is_err() {
error!("Channel closed for ListenerActor"); warn!("failed to send state - receiver dropped");
break; }
}
None => {
error!("command channel closed");
break;
}
} }
} }
} }
} }
} }
.instrument(span)
.await;
} }
async fn set_state(&mut self, state: ConnectionState) { async fn set_state(&mut self, state: ConnectionState) {
@ -146,88 +153,106 @@ impl ListenerActor {
.unwrap(); .unwrap();
} }
async fn run_supervised_loop(&mut self) { async fn run_supervised_loop(&mut self) {
dbg!("supervised"); let span = tracing::info_span!("supervised_loop");
let retrier = || { async {
crate::retry::WaitExponentialRandom::builder() let retrier = || {
.min(Duration::from_secs(1)) crate::retry::WaitExponentialRandom::builder()
.max(Duration::from_secs(5 * 60)) .min(Duration::from_secs(1))
.build() .max(Duration::from_secs(5 * 60))
}; .build()
let mut retry = retrier(); };
loop { let mut retry = retrier();
let start_time = std::time::Instant::now(); loop {
let start_time = std::time::Instant::now();
if let Err(e) = self.recv_and_forward_loop().await { if let Err(e) = self.recv_and_forward_loop().await {
let uptime = std::time::Instant::now().duration_since(start_time); let uptime = std::time::Instant::now().duration_since(start_time);
// Reset retry delay to minimum if uptime was decent enough // Reset retry delay to minimum if uptime was decent enough
if uptime > Duration::from_secs(60 * 4) { if uptime > Duration::from_secs(60 * 4) {
retry = retrier(); debug!("resetting retry delay due to sufficient uptime");
retry = retrier();
}
error!(error = ?e, "connection error");
self.set_state(ConnectionState::Reconnecting {
retry_count: retry.count(),
delay: retry.next_delay(),
error: Some(Arc::new(e)),
})
.await;
info!(delay = ?retry.next_delay(), "waiting before reconnect attempt");
retry.wait().await;
} else {
break;
} }
error!(error = ?e);
self.set_state(ConnectionState::Reconnecting {
retry_count: retry.count(),
delay: retry.next_delay(),
error: Some(Arc::new(e)),
})
.await;
info!(delay = ?retry.next_delay(), "restarting");
retry.wait().await;
} else {
break;
} }
} }
.instrument(span)
.await;
} }
async fn recv_and_forward_loop(&mut self) -> anyhow::Result<()> { async fn recv_and_forward_loop(&mut self) -> anyhow::Result<()> {
let creds = self.config.credentials.get(&self.config.endpoint); let span = tracing::info_span!("receive_loop",
let req = topic_request( endpoint = %self.config.endpoint,
&self.config.http_client, topic = %self.config.topic,
&self.config.endpoint, since = %self.config.since
&self.config.topic,
self.config.since,
creds.as_ref().map(|x| x.username.as_str()),
creds.as_ref().map(|x| x.password.as_str()),
); );
let res = self.config.http_client.execute(req?).await?; async {
let res = res.error_for_status()?; let creds = self.config.credentials.get(&self.config.endpoint);
let reader = tokio_util::io::StreamReader::new( debug!("creating request");
res.bytes_stream() let req = topic_request(
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())), &self.config.http_client,
); &self.config.endpoint,
let stream = response_lines(reader).await?; &self.config.topic,
tokio::pin!(stream); self.config.since,
creds.as_ref().map(|x| x.username.as_str()),
creds.as_ref().map(|x| x.password.as_str()),
);
self.set_state(ConnectionState::Connected).await; debug!("executing request");
let res = self.config.http_client.execute(req?).await?;
let res = res.error_for_status()?;
let reader = tokio_util::io::StreamReader::new(
res.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())),
);
let stream = response_lines(reader).await?;
tokio::pin!(stream);
info!(topic = %&self.config.topic, "listening"); self.set_state(ConnectionState::Connected).await;
while let Some(msg) = stream.next().await { info!("connection established");
let msg = msg?;
let min_msg = serde_json::from_str::<models::MinMessage>(&msg) info!(topic = %&self.config.topic, "listening");
.map_err(|e| Error::InvalidMinMessage(msg.to_string(), e))?; while let Some(msg) = stream.next().await {
self.config.since = min_msg.time.max(self.config.since); let msg = msg?;
let event = serde_json::from_str(&msg) let min_msg = serde_json::from_str::<models::MinMessage>(&msg)
.map_err(|e| Error::InvalidMessage(msg.to_string(), e))?; .map_err(|e| Error::InvalidMinMessage(msg.to_string(), e))?;
self.config.since = min_msg.time.max(self.config.since);
match event { let event = serde_json::from_str(&msg)
ServerEvent::Message(msg) => { .map_err(|e| Error::InvalidMessage(msg.to_string(), e))?;
debug!("message event");
self.event_tx match event {
.send(ListenerEvent::Message(msg)) ServerEvent::Message(msg) => {
.await debug!(id = %msg.id, "forwarding message");
.unwrap(); self.event_tx
} .send(ListenerEvent::Message(msg))
ServerEvent::KeepAlive { .. } => { .await
debug!("keepalive event"); .unwrap();
} }
ServerEvent::Open { .. } => { ServerEvent::KeepAlive { id, .. } => {
debug!("open event"); debug!(id = %id, "received keepalive");
}
ServerEvent::Open { id, .. } => {
debug!(id = %id, "received open event");
}
} }
} }
}
Ok(()) Ok(())
}
.instrument(span)
.await
} }
} }
@ -323,13 +348,6 @@ mod tests {
ListenerEvent::ConnectionStateChanged(ConnectionState::Connected { .. }), ListenerEvent::ConnectionStateChanged(ConnectionState::Connected { .. }),
] ]
)); ));
// assert!(matches!(
// listener,
// ListenerEvent::Error { .. },
// ListenerEvent::Disconnected { .. },
// ListenerEvent::Connected { .. },
// ));
}); });
local_set.await; local_set.await;
} }
@ -372,26 +390,4 @@ mod tests {
}); });
local_set.await; local_set.await;
} }
#[tokio::test]
async fn integration_connects_sends_receives_simple() {
let local_set = LocalSet::new();
local_set.spawn_local(async {
let http_client = HttpClient::new(reqwest::Client::new());
let credentials = Credentials::new_nullable(vec![]).await.unwrap();
let config = ListenerConfig {
http_client,
credentials,
endpoint: "http://localhost:8000".to_string(),
topic: "test".to_string(),
since: 0,
};
let listener = ListenerHandle::new(config.clone());
// assert_event_matches!(listener, ListenerEvent::Connected { .. },);
});
local_set.await;
}
} }

View File

@ -394,6 +394,6 @@ impl NullNetworkMonitor {
impl NetworkMonitorProxy for NullNetworkMonitor { impl NetworkMonitorProxy for NullNetworkMonitor {
fn listen(&self) -> Pin<Box<dyn Stream<Item = ()>>> { fn listen(&self) -> Pin<Box<dyn Stream<Item = ()>>> {
todo!() Box::pin(futures::stream::empty())
} }
} }