[db] Fix database missing indexes
This commit is contained in:
1
ntfy-daemon/src/message_repo/migrations/01.sql
Normal file
1
ntfy-daemon/src/message_repo/migrations/01.sql
Normal file
@ -0,0 +1 @@
|
||||
CREATE INDEX IF NOT EXISTS message_server_topic_time ON message (server, topic, data ->> '$.time');
|
||||
3
ntfy-daemon/src/message_repo/migrations/02.sql
Normal file
3
ntfy-daemon/src/message_repo/migrations/02.sql
Normal file
@ -0,0 +1,3 @@
|
||||
ALTER TABLE message ADD COLUMN timestamp INTEGER AS (data ->> '$.time') STORED;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS message_server_topic_timestamp ON message (server, topic, timestamp);
|
||||
@ -27,10 +27,10 @@ impl Db {
|
||||
Ok(this)
|
||||
}
|
||||
fn migrate(&mut self) -> Result<()> {
|
||||
self.conn
|
||||
.read()
|
||||
.unwrap()
|
||||
.execute_batch(include_str!("./migrations/00.sql"))?;
|
||||
let conn = self.conn.read().unwrap();
|
||||
conn.execute_batch(include_str!("./migrations/00.sql"))?;
|
||||
conn.execute_batch(include_str!("./migrations/01.sql"))?;
|
||||
conn.execute_batch(include_str!("./migrations/02.sql"))?;
|
||||
Ok(())
|
||||
}
|
||||
fn get_or_insert_server(&mut self, server: &str) -> Result<i64> {
|
||||
@ -77,21 +77,41 @@ impl Db {
|
||||
server: &str,
|
||||
topic: &str,
|
||||
since: u64,
|
||||
limit: Option<u64>,
|
||||
offset: Option<u64>,
|
||||
) -> Result<Vec<String>, rusqlite::Error> {
|
||||
let conn = self.conn.read().unwrap();
|
||||
let mut stmt = conn.prepare(
|
||||
let mut query = String::from(
|
||||
"
|
||||
SELECT data
|
||||
FROM subscription sub
|
||||
JOIN server s ON sub.server = s.id
|
||||
JOIN message m ON m.server = sub.server AND m.topic = sub.topic
|
||||
WHERE s.endpoint = ?1 AND m.topic = ?2 AND m.data ->> 'time' >= ?3
|
||||
ORDER BY m.data ->> 'time'
|
||||
WHERE s.endpoint = ?1 AND m.topic = ?2 AND m.timestamp >= ?3
|
||||
ORDER BY m.timestamp
|
||||
",
|
||||
)?;
|
||||
let msgs: Result<Vec<String>, _> = stmt
|
||||
.query_map(params![server, topic, since], |row| row.get(0))?
|
||||
.collect();
|
||||
);
|
||||
if limit.is_some() {
|
||||
query.push_str(" LIMIT ?4");
|
||||
if offset.is_some() {
|
||||
query.push_str(" OFFSET ?5");
|
||||
}
|
||||
}
|
||||
let mut stmt = conn.prepare(&query)?;
|
||||
|
||||
let msgs: Result<Vec<String>, _> = if let (Some(limit), Some(offset)) = (limit, offset) {
|
||||
stmt.query_map(params![server, topic, since, limit, offset], |row| {
|
||||
row.get(0)
|
||||
})?
|
||||
.collect()
|
||||
} else if let Some(limit) = limit {
|
||||
stmt.query_map(params![server, topic, since, limit], |row| row.get(0))?
|
||||
.collect()
|
||||
} else {
|
||||
stmt.query_map(params![server, topic, since], |row| row.get(0))?
|
||||
.collect()
|
||||
};
|
||||
|
||||
msgs
|
||||
}
|
||||
pub fn insert_subscription(&mut self, sub: models::Subscription) -> Result<(), Error> {
|
||||
|
||||
@ -177,9 +177,9 @@ impl SubscriptionActor {
|
||||
debug!(topic=?self.model.topic, "attaching new listener");
|
||||
let messages = self
|
||||
.env
|
||||
.db
|
||||
.list_messages(&self.model.server, &self.model.topic, 0)
|
||||
.unwrap_or_default();
|
||||
.db
|
||||
.list_messages(&self.model.server, &self.model.topic, 0, None, None)
|
||||
.unwrap_or_default();
|
||||
let mut previous_events: Vec<ListenerEvent> = messages
|
||||
.into_iter()
|
||||
.filter_map(|msg| {
|
||||
|
||||
Reference in New Issue
Block a user