Files
vrobbler/vrobbler/apps/scrobbles/tasks.py
Colin Powell 0639033aa9
All checks were successful
build & deploy / test (push) Successful in 1m57s
build & deploy / build-and-deploy (push) Successful in 30s
[tasks] Fix backup locations
2026-05-24 12:47:12 -04:00

519 lines
16 KiB
Python

import logging
from datetime import timedelta
from celery import shared_task
from charts.utils import (
build_charts_since,
build_daily_charts,
build_monthly_charts,
build_weekly_charts,
build_yearly_charts,
)
from django.apps import apps
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
logger = logging.getLogger(__name__)
User = get_user_model()
CHARTABLE_MEDIA_TYPES = [
"artist",
"album",
"track",
"tv_series",
"video",
"podcast",
"board_game",
"trail",
"food",
"book",
]
SCROBBLES_WITHOUT_CHARTS = [
"geolocation",
]
@shared_task
def check_twitch_channels_for_vods():
"""Check recent Twitch channel scrobbles for matching VODs."""
from scrobbles.models import Scrobble
cutoff = timezone.now() - timedelta(hours=48)
channel_scrobbles = Scrobble.objects.filter(
media_type=Scrobble.MediaType.CHANNEL,
timestamp__gte=cutoff,
)
logger.info(f"[twitch_vods] Checking {channel_scrobbles.count()} channel scrobbles")
matched_count = 0
for scrobble in channel_scrobbles:
if not scrobble.channel or not scrobble.channel.twitch_id:
continue
try:
from videos.sources.twitch import get_channel_vods
vods = get_channel_vods(scrobble.channel.twitch_id)
scrobble_time = scrobble.timestamp
for vod in vods:
if not vod.get("published_at"):
continue
from videos.sources import twitch as twitch_source
vod_time = twitch_source.parse_twitch_datetime(vod["published_at"])
if not vod_time:
continue
time_diff = (vod_time - scrobble_time).total_seconds()
if 0 < time_diff <= 86400:
from videos.models import Video
video = Video.get_from_twitch_id(vod["id"], overwrite=True)
if video:
video.scrobble_for_user(
scrobble.user_id,
status="stopped",
source="Twitch VOD",
)
matched_count += 1
logger.info(
f"[twitch_vods] Matched VOD {vod['id']} for channel scrobble {scrobble.id}"
)
break
except Exception as e:
logger.warning(
f"[twitch_vods] Error processing scrobble {scrobble.id}: {e}"
)
logger.info(f"[twitch_vods] Matched {matched_count} VODs")
@shared_task
def process_retroarch_import(import_id):
RetroarchImport = apps.get_model("scrobbles", "RetroarchImport")
retroarch_import = RetroarchImport.objects.filter(id=import_id).first()
if not retroarch_import:
logger.warn(f"RetroarchImport not found with id {import_id}")
retroarch_import.process()
@shared_task
def process_bgstats_import(import_id):
BGStatsImport = apps.get_model("scrobbles", "BGStatsImport")
bgstats_import = BGStatsImport.objects.filter(id=import_id).first()
if not bgstats_import:
logger.warn(f"BGStatsImport not found with id {import_id}")
return
bgstats_import.process()
@shared_task
def process_lastfm_import(import_id):
LastFmImport = apps.get_model("scrobbles", "LastFMImport")
lastfm_import = LastFmImport.objects.filter(id=import_id).first()
if not lastfm_import:
logger.warn(f"LastFmImport not found with id {import_id}")
lastfm_import.process()
@shared_task
def process_tsv_import(import_id):
model_path = "scrobbles.AudioscrobblerTSVImport"
AudioScrobblerTSVImport = apps.get_model(model_path)
tsv_import = AudioScrobblerTSVImport.objects.filter(id=import_id).first()
if not tsv_import:
logger.warn(f"AudioScrobblerTSVImport not found with id {import_id}")
tsv_import.process()
@shared_task
def process_koreader_import(import_id):
KoReaderImport = apps.get_model("scrobbles", "KoReaderImport")
koreader_import = KoReaderImport.objects.filter(id=import_id).first()
if not koreader_import:
logger.warn(f"KOReaderImport not found with id {import_id}")
koreader_import.process()
@shared_task
def process_trail_gpx_import(import_id):
TrailGPXImport = apps.get_model("scrobbles", "TrailGPXImport")
trail_gpx_import = TrailGPXImport.objects.filter(id=import_id).first()
if not trail_gpx_import:
logger.warn(f"TrailGPXImport not found with id {import_id}")
return
trail_gpx_import.process()
@shared_task
def process_ebird_csv_import(import_id):
EBirdCSVImport = apps.get_model("scrobbles", "EBirdCSVImport")
birding_import = EBirdCSVImport.objects.filter(id=import_id).first()
if not birding_import:
logger.warn(f"EBirdCSVImport not found with id {import_id}")
return
birding_import.process()
@shared_task
def process_scale_csv_import(import_id):
ScaleCSVImport = apps.get_model("scrobbles", "ScaleCSVImport")
scale_import = ScaleCSVImport.objects.filter(id=import_id).first()
if not scale_import:
logger.warn(f"ScaleCSVImport not found with id {import_id}")
return
scale_import.process()
@shared_task
def create_yesterdays_charts():
"""Build/update charts for all users starting from last known record."""
for user in User.objects.all():
build_charts_since(user)
@shared_task
def build_charts_for_user(user_id):
"""Build charts for a specific user starting from last known record."""
user = User.objects.filter(id=user_id).first()
if not user:
logger.error(f"User with id {user_id} not found")
return
logger.info(f"Building charts for {user}")
build_charts_since(user)
@shared_task
def rebuild_weekly_charts():
"""Rebuild weekly charts for all users for the just-completed week."""
now = timezone.now()
year, week, _ = now.isocalendar()
for user in User.objects.all():
build_weekly_charts(user, year, week, CHARTABLE_MEDIA_TYPES)
logger.info(f"Rebuilt weekly charts for week {week}, {year}")
@shared_task
def rebuild_monthly_charts():
"""Rebuild monthly charts for all users for the just-completed month."""
now = timezone.now()
year, month = now.year, now.month
if month == 1:
month = 12
year -= 1
else:
month -= 1
for user in User.objects.all():
build_monthly_charts(user, year, month, CHARTABLE_MEDIA_TYPES)
logger.info(f"Rebuilt monthly charts for {month}/{year}")
@shared_task
def rebuild_yearly_charts():
"""Rebuild yearly charts for all users for the just-completed year."""
now = timezone.now()
year = now.year - 1
for user in User.objects.all():
build_yearly_charts(user, year, CHARTABLE_MEDIA_TYPES)
logger.info(f"Rebuilt yearly charts for {year}")
@shared_task
def update_charts_for_timestamp(user_id, year, month, day, week):
"""Update charts for a specific time period."""
user = User.objects.filter(id=user_id).first()
if not user:
logger.error(f"User with id {user_id} not found")
return
try:
build_daily_charts(user, year, month, day, CHARTABLE_MEDIA_TYPES)
build_weekly_charts(user, year, week, CHARTABLE_MEDIA_TYPES)
build_monthly_charts(user, year, month, CHARTABLE_MEDIA_TYPES)
build_yearly_charts(user, year, CHARTABLE_MEDIA_TYPES)
date_str = f"{year}-{month:02d}-{day:02d}"
logger.info(f"[charts] Updated charts for {user} on {date_str}")
except Exception as e:
logger.error(f"[charts] Failed to update charts: {e}")
# ── Crontab replacements ──────────────────────────────────────────────────────
BACKUP_RETENTION_DAYS = 7
BACKUP_RETENTION_MONTHS = 12
def _cleanup_failed_backup(backup_path):
"""Remove a failed/incomplete backup file if it exists."""
from pathlib import Path
p = Path(backup_path)
if p.exists():
p.unlink()
logger.warning("backup_database: removed incomplete backup %s", backup_path)
def _retention_files_to_delete(remote_files, now):
"""Return list of filenames to delete under retention policy.
Policy: keep all files from the last *BACKUP_RETENTION_DAYS* days,
plus the single most-recent backup from each of the last
*BACKUP_RETENTION_MONTHS* calendar months (not already covered by
the day-based window).
"""
import re
from collections import defaultdict
from datetime import timedelta, date
pattern = re.compile(r"vrobbler-backup-(\d{4})_(\d{2})_(\d{2})\.sql\.gz")
parsed = []
for fname in remote_files:
m = pattern.match(fname)
if not m:
continue
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
parsed.append((date(y, mo, d), fname))
cutoff = now.date() - timedelta(days=BACKUP_RETENTION_DAYS)
keep = set()
for d, fname in parsed:
if d >= cutoff:
keep.add(fname)
remaining = [(d, fname) for d, fname in parsed if fname not in keep]
by_month = defaultdict(list)
for d, fname in remaining:
by_month[(d.year, d.month)].append((d, fname))
for files in by_month.values():
files.sort(key=lambda x: x[0], reverse=True)
keep.add(files[0][1])
return [fname for _, fname in parsed if fname not in keep]
def _run_remote_cleanup(ssh_key, ssh_host, remote_path):
"""SSH to remote host, list backup files, delete those outside retention."""
import subprocess
import re
result = subprocess.run(
["ssh", "-i", ssh_key, ssh_host, "ls", "-1", remote_path],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
logger.warning(
"backup_database: could not list remote files (%s)", result.stderr.strip()
)
return
files = [line.strip() for line in result.stdout.splitlines() if line.strip()]
if not files:
return
from datetime import datetime
now = datetime.now()
to_delete = _retention_files_to_delete(files, now)
if not to_delete:
logger.info("backup_database: no remote files to prune")
return
# Delete in batches to avoid absurdly long command lines
batch_size = 50
for i in range(0, len(to_delete), batch_size):
batch = to_delete[i : i + batch_size]
rm_cmd = ["ssh", "-i", ssh_key, ssh_host, "rm", "-f"]
rm_cmd.extend(f"{remote_path}/{f}" for f in batch)
subprocess.run(rm_cmd, check=True, timeout=30)
logger.info(
"backup_database: pruned %d remote backup(s)", len(to_delete)
)
@shared_task
def backup_database():
"""pg_dump + gzip, scp to remote, retention cleanup, ntfy notification."""
import os
import subprocess
import re
from datetime import datetime
from pathlib import Path
import requests as req
db = settings.DATABASES["default"]
engine = db["ENGINE"]
if "postgresql" not in engine and "postgis" not in engine:
logger.warning("backup_database skipped — not PostgreSQL")
return
backup_dir = Path(settings.DB_BACKUP_LOCAL_DIR)
backup_dir.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y_%m_%d")
backup_path = backup_dir / f"vrobbler-backup-{date_str}.sql.gz"
env = os.environ.copy()
if db.get("PASSWORD"):
env["PGPASSWORD"] = db["PASSWORD"]
pg_dump_cmd = [
"pg_dump",
"--no-blobs",
"-h", db.get("HOST", "localhost"),
"-U", db.get("USER", "postgres"),
"-d", db["NAME"],
]
logger.info("backup_database: dumping %s to %s", db["NAME"], backup_path)
try:
with open(backup_path, "wb") as f:
dump_proc = subprocess.Popen(pg_dump_cmd, stdout=subprocess.PIPE, env=env)
gzip_proc = subprocess.Popen(
["gzip"], stdin=dump_proc.stdout, stdout=f
)
dump_proc.stdout.close()
gzip_proc.communicate()
if gzip_proc.returncode != 0:
logger.error("backup_database: pg_dump / gzip failed")
_cleanup_failed_backup(backup_path)
return
dump_size = backup_path.stat().st_size
logger.info(
"backup_database: dump complete (%.1f MB)", dump_size / 1_000_000
)
ssh_key = getattr(settings, "DB_BACKUP_SSH_KEY", "")
ssh_dest = getattr(settings, "DB_BACKUP_SSH_DEST", "")
if ssh_key and ssh_dest:
logger.info("backup_database: copying to %s", ssh_dest)
try:
subprocess.run(
["scp", "-i", ssh_key, str(backup_path), ssh_dest],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as exc:
logger.error(
"backup_database: scp failed (stderr: %s)", exc.stderr.strip()
)
_cleanup_failed_backup(backup_path)
return
logger.info("backup_database: copied to %s", ssh_dest)
# Parse user@host and path from dest
m = re.match(r"(\S+)@(\S+):(.+)", ssh_dest)
if m:
ssh_host = f"{m.group(1)}@{m.group(2)}"
remote_path = m.group(3)
logger.info("backup_database: pruning old remote backups")
_run_remote_cleanup(ssh_key, ssh_host, remote_path)
else:
logger.warning(
"backup_database: DB_BACKUP_SSH_KEY and DB_BACKUP_SSH_DEST not set — "
"backup saved locally at %s",
backup_path,
)
ntfy_url = getattr(
settings, "DB_BACKUP_NTFY_URL", "https://ntfy.unbl.ink/backups"
)
req.post(ntfy_url, data=b"Vrobbler backup succeeded")
logger.info("backup_database: completed %s", backup_path)
except Exception as e:
logger.error("backup_database failed: %s", e)
_cleanup_failed_backup(backup_path)
@shared_task
def import_from_lastfm_all_users():
"""Import Last.fm scrobbles for all users (replaces */30 cron)."""
from vrobbler.apps.scrobbles.utils import import_lastfm_for_all_users
import_lastfm_for_all_users()
@shared_task
def import_from_retroarch_all_users():
"""Import RetroArch scrobbles for all users (replaces @daily cron).
Deprecated: retroarch .lrtl files are now picked up by the WebDAV
importer (scan_webdav_for_retroarch). This task remains for manual use.
"""
import warnings
warnings.warn(
"import_from_retroarch_all_users is deprecated. "
"Upload .lrtl files to WebDAV var/retroarch/ instead.",
DeprecationWarning,
stacklevel=2,
)
from vrobbler.apps.scrobbles.utils import import_retroarch_for_all_users
import_retroarch_for_all_users()
@shared_task
def import_from_webdav_all_users():
"""Import from WebDAV for all users (replaces */2 cron)."""
from vrobbler.apps.scrobbles.importers.webdav import import_from_webdav_for_all_users
import_from_webdav_for_all_users()
@shared_task
def import_from_imap_all_users():
"""Deprecated — BG Stats files now picked up from WebDAV var/bgstats/."""
import warnings
warnings.warn(
"IMAP import is deprecated. Upload .bgsplay files to WebDAV var/bgstats/ instead.",
DeprecationWarning,
stacklevel=2,
)
logger.warning("Skipping deprecated IMAP import (use WebDAV var/bgstats/ instead)")
@shared_task
def import_from_lichess_all_users():
"""Import chess games from Lichess for all users (replaces */20 cron)."""
from vrobbler.apps.boardgames.sources.lichess import import_chess_games_for_all_users
import_chess_games_for_all_users()
@shared_task
def send_notification_for_in_progress():
"""Send ntfy stop-notifications for in-progress scrobbles (replaces */3 cron)."""
from vrobbler.apps.scrobbles.utils import send_stop_notifications_for_in_progress_scrobbles
send_stop_notifications_for_in_progress_scrobbles()
@shared_task
def send_mood_checkin():
"""Send mood check-in reminders (replaces @hourly cron)."""
from vrobbler.apps.scrobbles.utils import send_mood_checkin_reminders
send_mood_checkin_reminders()