[tasks] Move tasks out of cron and into celerybeat
This commit is contained in:
@ -10,6 +10,7 @@ from charts.utils import (
|
||||
build_yearly_charts,
|
||||
)
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils import timezone
|
||||
|
||||
@ -217,3 +218,216 @@ def update_charts_for_timestamp(user_id, year, month, day, week):
|
||||
logger.info(f"[charts] Updated charts for {user} on {date_str}")
|
||||
except Exception as e:
|
||||
logger.error(f"[charts] Failed to update charts: {e}")
|
||||
|
||||
|
||||
# ── Crontab replacements ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
BACKUP_RETENTION_DAYS = 7
|
||||
BACKUP_RETENTION_MONTHS = 12
|
||||
|
||||
|
||||
def _retention_files_to_delete(remote_files, now):
|
||||
"""Return list of filenames to delete under retention policy.
|
||||
|
||||
Policy: keep all files from the last *BACKUP_RETENTION_DAYS* days,
|
||||
plus the single most-recent backup from each of the last
|
||||
*BACKUP_RETENTION_MONTHS* calendar months (not already covered by
|
||||
the day-based window).
|
||||
"""
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from datetime import timedelta, date
|
||||
|
||||
pattern = re.compile(r"vrobbler-backup-(\d{4})_(\d{2})_(\d{2})\.sql\.gz")
|
||||
parsed = []
|
||||
for fname in remote_files:
|
||||
m = pattern.match(fname)
|
||||
if not m:
|
||||
continue
|
||||
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||||
parsed.append((date(y, mo, d), fname))
|
||||
|
||||
cutoff = now.date() - timedelta(days=BACKUP_RETENTION_DAYS)
|
||||
|
||||
keep = set()
|
||||
for d, fname in parsed:
|
||||
if d >= cutoff:
|
||||
keep.add(fname)
|
||||
|
||||
remaining = [(d, fname) for d, fname in parsed if fname not in keep]
|
||||
by_month = defaultdict(list)
|
||||
for d, fname in remaining:
|
||||
by_month[(d.year, d.month)].append((d, fname))
|
||||
|
||||
for files in by_month.values():
|
||||
files.sort(key=lambda x: x[0], reverse=True)
|
||||
keep.add(files[0][1])
|
||||
|
||||
return [fname for _, fname in parsed if fname not in keep]
|
||||
|
||||
|
||||
def _run_remote_cleanup(ssh_key, ssh_host, remote_path):
|
||||
"""SSH to remote host, list backup files, delete those outside retention."""
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
result = subprocess.run(
|
||||
["ssh", "-i", ssh_key, ssh_host, "ls", "-1", remote_path],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.warning(
|
||||
"backup_database: could not list remote files (%s)", result.stderr.strip()
|
||||
)
|
||||
return
|
||||
|
||||
files = [line.strip() for line in result.stdout.splitlines() if line.strip()]
|
||||
if not files:
|
||||
return
|
||||
|
||||
from datetime import datetime
|
||||
now = datetime.now()
|
||||
to_delete = _retention_files_to_delete(files, now)
|
||||
if not to_delete:
|
||||
logger.info("backup_database: no remote files to prune")
|
||||
return
|
||||
|
||||
# Delete in batches to avoid absurdly long command lines
|
||||
batch_size = 50
|
||||
for i in range(0, len(to_delete), batch_size):
|
||||
batch = to_delete[i : i + batch_size]
|
||||
rm_cmd = ["ssh", "-i", ssh_key, ssh_host, "rm", "-f"]
|
||||
rm_cmd.extend(f"{remote_path}/{f}" for f in batch)
|
||||
subprocess.run(rm_cmd, check=True, timeout=30)
|
||||
|
||||
logger.info(
|
||||
"backup_database: pruned %d remote backup(s)", len(to_delete)
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
def backup_database():
|
||||
"""pg_dump + gzip, scp to remote, retention cleanup, ntfy notification."""
|
||||
import os
|
||||
import subprocess
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import requests as req
|
||||
|
||||
db = settings.DATABASES["default"]
|
||||
engine = db["ENGINE"]
|
||||
if "postgresql" not in engine and "postgis" not in engine:
|
||||
logger.warning("backup_database skipped — not PostgreSQL")
|
||||
return
|
||||
|
||||
date_str = datetime.now().strftime("%Y_%m_%d")
|
||||
backup_path = Path(f"/tmp/vrobbler-backup-{date_str}.sql.gz")
|
||||
|
||||
env = os.environ.copy()
|
||||
if db.get("PASSWORD"):
|
||||
env["PGPASSWORD"] = db["PASSWORD"]
|
||||
|
||||
pg_dump_cmd = [
|
||||
"pg_dump",
|
||||
"-h", db.get("HOST", "localhost"),
|
||||
"-U", db.get("USER", "postgres"),
|
||||
"-d", db["NAME"],
|
||||
]
|
||||
|
||||
try:
|
||||
with open(backup_path, "wb") as f:
|
||||
dump_proc = subprocess.Popen(pg_dump_cmd, stdout=subprocess.PIPE, env=env)
|
||||
gzip_proc = subprocess.Popen(
|
||||
["gzip"], stdin=dump_proc.stdout, stdout=f
|
||||
)
|
||||
dump_proc.stdout.close()
|
||||
gzip_proc.communicate()
|
||||
|
||||
if gzip_proc.returncode != 0:
|
||||
logger.error("backup_database: pg_dump / gzip failed")
|
||||
return
|
||||
|
||||
ssh_key = getattr(settings, "DB_BACKUP_SSH_KEY", "")
|
||||
ssh_dest = getattr(settings, "DB_BACKUP_SSH_DEST", "")
|
||||
if ssh_key and ssh_dest:
|
||||
subprocess.run(
|
||||
["scp", "-i", ssh_key, str(backup_path), ssh_dest],
|
||||
check=True,
|
||||
)
|
||||
logger.info("backup_database: copied to %s", ssh_dest)
|
||||
|
||||
# Parse user@host and path from dest
|
||||
m = re.match(r"(\S+)@(\S+):(.+)", ssh_dest)
|
||||
if m:
|
||||
ssh_host = f"{m.group(1)}@{m.group(2)}"
|
||||
remote_path = m.group(3)
|
||||
_run_remote_cleanup(ssh_key, ssh_host, remote_path)
|
||||
|
||||
ntfy_url = getattr(
|
||||
settings, "DB_BACKUP_NTFY_URL", "https://ntfy.unbl.ink/backups"
|
||||
)
|
||||
req.post(ntfy_url, data=b"Vrobbler backup succeeded")
|
||||
logger.info("backup_database: completed %s", backup_path)
|
||||
|
||||
backup_path.unlink(missing_ok=True)
|
||||
except Exception as e:
|
||||
logger.error("backup_database failed: %s", e)
|
||||
|
||||
|
||||
@shared_task
|
||||
def import_from_lastfm_all_users():
|
||||
"""Import Last.fm scrobbles for all users (replaces */30 cron)."""
|
||||
from scrobbles.utils import import_lastfm_for_all_users
|
||||
|
||||
import_lastfm_for_all_users()
|
||||
|
||||
|
||||
@shared_task
|
||||
def import_from_retroarch_all_users():
|
||||
"""Import RetroArch scrobbles for all users (replaces @daily cron)."""
|
||||
from scrobbles.utils import import_retroarch_for_all_users
|
||||
|
||||
import_retroarch_for_all_users()
|
||||
|
||||
|
||||
@shared_task
|
||||
def import_from_webdav_all_users():
|
||||
"""Import from WebDAV for all users (replaces */2 cron)."""
|
||||
from scrobbles.importers.webdav import import_from_webdav_for_all_users
|
||||
|
||||
import_from_webdav_for_all_users()
|
||||
|
||||
|
||||
@shared_task
|
||||
def import_from_imap_all_users():
|
||||
"""Import from IMAP for all users (replaces */4 cron)."""
|
||||
from scrobbles.importers.imap import import_scrobbles_from_imap
|
||||
|
||||
import_scrobbles_from_imap()
|
||||
|
||||
|
||||
@shared_task
|
||||
def import_from_lichess_all_users():
|
||||
"""Import chess games from Lichess for all users (replaces */20 cron)."""
|
||||
from boardgames.sources.lichess import import_chess_games_for_all_users
|
||||
|
||||
import_chess_games_for_all_users()
|
||||
|
||||
|
||||
@shared_task
|
||||
def send_notification_for_in_progress():
|
||||
"""Send ntfy stop-notifications for in-progress scrobbles (replaces */3 cron)."""
|
||||
from scrobbles.utils import send_stop_notifications_for_in_progress_scrobbles
|
||||
|
||||
send_stop_notifications_for_in_progress_scrobbles()
|
||||
|
||||
|
||||
@shared_task
|
||||
def send_mood_checkin():
|
||||
"""Send mood check-in reminders (replaces @hourly cron)."""
|
||||
from scrobbles.utils import send_mood_checkin_reminders
|
||||
|
||||
send_mood_checkin_reminders()
|
||||
|
||||
@ -126,6 +126,39 @@ CELERY_BEAT_SCHEDULE = {
|
||||
"task": "scrobbles.tasks.rebuild_yearly_charts",
|
||||
"schedule": crontab(hour=0, minute=30, day_of_month=1, month_of_year=1),
|
||||
},
|
||||
# ── Crontab replacements ─────────────────────────────────────────────
|
||||
"database-backup": {
|
||||
"task": "scrobbles.tasks.backup_database",
|
||||
"schedule": crontab(hour=0, minute=0),
|
||||
},
|
||||
"import-from-lastfm": {
|
||||
"task": "scrobbles.tasks.import_from_lastfm_all_users",
|
||||
"schedule": crontab(minute="*/30"),
|
||||
},
|
||||
"import-from-retroarch": {
|
||||
"task": "scrobbles.tasks.import_from_retroarch_all_users",
|
||||
"schedule": crontab(hour=0, minute=0),
|
||||
},
|
||||
"import-from-webdav": {
|
||||
"task": "scrobbles.tasks.import_from_webdav_all_users",
|
||||
"schedule": crontab(minute="*/2"),
|
||||
},
|
||||
"import-from-imap": {
|
||||
"task": "scrobbles.tasks.import_from_imap_all_users",
|
||||
"schedule": crontab(minute="*/4"),
|
||||
},
|
||||
"import-from-lichess": {
|
||||
"task": "scrobbles.tasks.import_from_lichess_all_users",
|
||||
"schedule": crontab(minute="*/20"),
|
||||
},
|
||||
"send-notification-for-in-progress": {
|
||||
"task": "scrobbles.tasks.send_notification_for_in_progress",
|
||||
"schedule": crontab(minute="*/3"),
|
||||
},
|
||||
"send-mood-checkin": {
|
||||
"task": "scrobbles.tasks.send_mood_checkin",
|
||||
"schedule": crontab(hour="*", minute=0),
|
||||
},
|
||||
}
|
||||
|
||||
INSTALLED_APPS = [
|
||||
|
||||
Reference in New Issue
Block a user