Files
vrobbler/vrobbler/apps/scrobbles/views.py
Colin Powell dccc80c615
Some checks failed
build / test (push) Has been cancelled
[discgolf] Fix tests and naming scheme
2026-06-20 00:55:16 -04:00

1973 lines
68 KiB
Python

import calendar
import json
import logging
from datetime import datetime, timedelta
from uuid import uuid4
import requests
import pendulum
import pytz
from dateutil.relativedelta import relativedelta
from django.apps import apps
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.db.models import Count, F, Max, Q, Sum
from django.db.models.query import QuerySet
from rest_framework.authentication import TokenAuthentication
from rest_framework.authtoken.models import Token
from rest_framework.authtoken.views import ObtainAuthToken
from rest_framework.permissions import IsAuthenticated
class BearerTokenAuthentication(TokenAuthentication):
keyword = "Bearer"
from django.http import (
FileResponse,
Http404,
HttpResponseRedirect,
JsonResponse,
)
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse_lazy
from django.utils import timezone
from django.utils.dateformat import DateFormat
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.views.generic import DetailView, FormView, TemplateView, View
from django.views.generic.edit import CreateView
from django.views.generic.list import ListView
from moods.models import Mood
from music.aggregators import (
artist_scrobble_count,
batch_live_charts,
live_charts,
live_tv_charts,
live_youtube_channel_charts,
scrobble_counts,
week_of_scrobbles,
)
from pendulum.parsing.exceptions import ParserError
from profiles.models import UserProfile
from profiles.utils import now_user_timezone
from rest_framework import status
from rest_framework.decorators import (
api_view,
parser_classes,
permission_classes,
)
from rest_framework.parsers import MultiPartParser
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from scrobbles.api import serializers
from scrobbles.constants import (
LONG_PLAY_MEDIA,
MANUAL_SCROBBLE_FNS,
PLAY_AGAIN_MEDIA,
)
from scrobbles.export import export_scrobbles
from scrobbles.forms import ExportScrobbleForm, ScrobbleForm
from scrobbles.constants import Visibility
from scrobbles.sqids import decode_scrobble_share
from scrobbles.models import (
AudioScrobblerTSVImport,
BGStatsImport,
EBirdCSVImport,
FavoriteMedia,
KoReaderImport,
LastFmImport,
RetroarchImport,
ScaleCSVImport,
Scrobble,
ScrobbleQuerySet,
ShareViewLog,
TrailGPXImport,
UDiscCSVImport,
)
from scrobbles.scrobblers import *
from scrobbles.tasks import (
process_koreader_import,
process_lastfm_import,
process_trail_gpx_import,
process_tsv_import,
)
from scrobbles.utils import (
get_daily_calories_for_user_by_day,
get_long_plays_completed,
get_long_plays_in_progress,
)
User = get_user_model()
logger = logging.getLogger(__name__)
class WebhookView(APIView):
permission_classes = [IsAuthenticated]
authentication_classes = [TokenAuthentication, BearerTokenAuthentication]
@property
def logger(self):
return logger
class ScrobbleableListView(ListView):
model = None
paginate_by = 200
def get_queryset(self):
queryset = super().get_queryset()
user_filter = Q()
if not self.request.user.is_anonymous:
user_filter = Q(scrobble__user=self.request.user)
queryset = (
queryset.filter(user_filter)
.annotate(
scrobble_count=Count("scrobble", distinct=True),
last_scrobble=Max("scrobble__timestamp"),
)
.filter(scrobble_count__gt=0)
.order_by("-last_scrobble")
)
return queryset
class ChartContextMixin:
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
from charts.models import ChartRecord
obj = context.get("object")
if not obj:
return context
media_type_map = {
"music.artist": "artist",
"music.album": "album",
"music.track": "track",
"videos.video": "video",
"videos.series": "tv_series",
"podcasts.podcast": "podcast",
"boardgames.boardgame": "board_game",
"trails.trail": "trail",
"foods.food": "food",
"books.book": "book",
}
model_label = f"{obj._meta.app_label}.{obj._meta.model_name}"
media_type = media_type_map.get(model_label)
if media_type:
charts_qs = ChartRecord.objects.filter(
**{media_type: obj}, rank__in=[1, 2, 3]
).exclude(day__isnull=False)
from collections import OrderedDict
grouped = OrderedDict()
for chart in charts_qs:
grouped.setdefault(chart.period_type, []).append(chart)
context["charts"] = grouped
return context
class ScrobbleableDetailView(ChartContextMixin, DetailView):
model = None
slug_field = "uuid"
paginate_by = 200 # You can set this to whatever page size you want
def get_context_data(self, **kwargs):
context_data = super().get_context_data(**kwargs)
scrobbles = []
if not self.request.user.is_anonymous and hasattr(self.object, "scrobble_set"):
scrobbles = self.object.scrobble_set.filter(
user=self.request.user
).order_by("-timestamp")
paginator = Paginator(scrobbles, self.paginate_by)
page_number = self.request.GET.get("page")
try:
page_obj = paginator.page(page_number)
except PageNotAnInteger:
page_obj = paginator.page(1)
except EmptyPage:
page_obj = paginator.page(paginator.num_pages)
context_data["page_obj"] = page_obj
context_data["scrobbles"] = page_obj.object_list
context_data["is_paginated"] = paginator.num_pages > 1
media = self.object
if hasattr(media, "is_long_play_media") and media.is_long_play_media():
qs = media.scrobble_set.filter(user=self.request.user)
completed = qs.filter(long_play_complete=True).order_by("-timestamp").first()
if completed and completed.long_play_seconds:
context_data["long_play_total_seconds"] = completed.long_play_seconds
context_data["long_play_finished_date"] = completed.timestamp
else:
latest_finished = qs.filter(played_to_completion=True).order_by("-timestamp").first()
if latest_finished and latest_finished.long_play_seconds:
context_data["long_play_total_seconds"] = latest_finished.long_play_seconds
context_data["long_play_finished_date"] = None
return context_data
class RecentScrobbleList(ListView):
model = Scrobble
def get(self, *args, **kwargs):
user = self.request.user
if user.is_authenticated:
if scrobble_url := self.request.GET.get("scrobble_url", ""):
action = self.request.GET.get("action", "")
source = self.request.GET.get("source", "Bookmarklet")
scrobble = manual_scrobble_from_url(
scrobble_url, self.request.user.id, source, action
)
return HttpResponseRedirect(scrobble.redirect_url(user.id))
return super().get(*args, **kwargs)
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
user = self.request.user
data["date"] = ""
if user.is_authenticated:
self.queryset = self.get_queryset().filter(user=user)
user_id = self.request.user.id
# Get user's home scrobble limit (default 20)
home_limit = 20
if hasattr(user, "profile") and user.profile.home_scrobble_limit:
home_limit = user.profile.home_scrobble_limit
today = timezone.localtime(timezone.now())
date_str = self.request.GET.get("date", "")
date = today
if date_str:
try:
date = pendulum.parse(date_str)
except ParserError:
pass
if date_str:
if date_str == "this_week" or "-W" in date_str:
next_date = date + timedelta(weeks=+2)
prev_date = date + timedelta(weeks=-1)
if date.isocalendar()[1] < today.isocalendar()[1]:
data["next_link"] = f"?date={next_date.strftime('%Y-W%W')}"
data["prev_link"] = f"?date={prev_date.strftime('%Y-W%W')}"
data["title"] = f"Week {date.strftime('%-W')} of {date.year}"
scrobbles_dict = Scrobble.as_dict_by_type(
Scrobble.for_week(user_id, date.year, date.isocalendar()[1])
)
for key in list(scrobbles_dict.keys()):
if not key.endswith("_count") and not key.endswith("_time"):
scrobbles_dict[key] = scrobbles_dict[key][:home_limit]
data = data | scrobbles_dict
elif date_str == "this_month" or date_str.count("-") == 1:
next_date = date + relativedelta(months=1)
prev_date = date + relativedelta(months=-1)
if date.month < today.month:
data["next_link"] = f"?date={next_date.strftime('%Y-%m')}"
data["title"] = f"{date.strftime('%B %Y')}"
data["prev_link"] = f"?date={prev_date.strftime('%Y-%m')}"
scrobbles_dict = Scrobble.as_dict_by_type(
Scrobble.for_month(user_id, date.year, date.month)
)
for key in list(scrobbles_dict.keys()):
if not key.endswith("_count") and not key.endswith("_time"):
scrobbles_dict[key] = scrobbles_dict[key][:home_limit]
data = data | scrobbles_dict
elif date_str == "today" or date_str.count("-") == 2:
next_date = date + timedelta(days=1)
prev_date = date - timedelta(days=1)
data["prev_link"] = f"?date={prev_date.strftime('%Y-%m-%d')}"
if date < today:
data["next_link"] = f"?date={next_date.strftime('%Y-%m-%d')}"
if date == today:
data["title"] = "Today"
else:
data["title"] = f"{date.strftime('%Y-%m-%d')}"
data["today_link"] = f"?date={today.strftime('%Y-%m-%d')}"
data = data | Scrobble.as_dict_by_type(
Scrobble.for_day(user_id, date.year, date.month, date.day)
)
elif date_str == "this_year" or date_str.count("-") == 0:
next_date = date + relativedelta(years=+1)
prev_date = date + relativedelta(years=-1)
data["next_link"] = ""
if date.year < today.year:
data["title"] = f"{date.strftime('%Y')}"
data["next_link"] = f"?date={next_date.year}"
data["title"] = f"{date.year}"
data["prev_link"] = f"?date={prev_date.year}"
scrobbles_dict = Scrobble.as_dict_by_type(
Scrobble.for_year(user_id, date.year)
)
for key in list(scrobbles_dict.keys()):
if not key.endswith("_count") and not key.endswith("_time"):
scrobbles_dict[key] = scrobbles_dict[key][:home_limit]
data = data | scrobbles_dict
else:
next_date = today - timedelta(days=1)
prev_date = today - timedelta(days=1)
data["title"] = "Today"
data["next_link"] = ""
data["prev_link"] = f"?date={prev_date.strftime('%Y-%m-%d')}"
# Get scrobbles grouped by type, limit to 20 per type
scrobbles_dict = Scrobble.as_dict_by_type(
Scrobble.for_day(user_id, date.year, date.month, date.day)
)
# Limit each media type to 20 most recent
for key in list(scrobbles_dict.keys()):
if not key.endswith("_count") and not key.endswith("_time"):
scrobbles_dict[key] = scrobbles_dict[key][:20]
data = data | scrobbles_dict
data["today_link"] = "" # f"?date={today.strftime('%Y-%m-%d')}"
data["active_imports"] = AudioScrobblerTSVImport.objects.filter(
processing_started__isnull=False,
processed_finished__isnull=True,
user=self.request.user,
)
data["counts"] = [] # scrobble_counts(user)
data["daily_calories"] = get_daily_calories_for_user_by_day(
self.request.user.id, date
)
data["life_events_in_progress"] = Scrobble.objects.filter(
user=self.request.user,
life_event__isnull=False,
in_progress=True,
).order_by("-timestamp")
else:
data["weekly_data"] = week_of_scrobbles()
data["counts"] = scrobble_counts()
data["imdb_form"] = ScrobbleForm
data["export_form"] = ExportScrobbleForm
return data
def get_queryset(self):
return Scrobble.objects.all().order_by("-timestamp")
class ScrobbleListView(LoginRequiredMixin, ListView):
model = Scrobble
paginate_by = 100
template_name = "scrobbles/scrobble_all_list.html"
def get_queryset(self):
qs = Scrobble.objects.filter(user=self.request.user).order_by("-timestamp")
tags_param = self.request.GET.get("tags", "")
if tags_param:
tag_list = [t.strip() for t in tags_param.split(",") if t.strip()]
if tag_list:
from django.contrib.contenttypes.models import ContentType
from taggit.models import TaggedItem
scrobble_ct = ContentType.objects.get_for_model(Scrobble)
matching_ids = list(
TaggedItem.objects.filter(
content_type=scrobble_ct,
tag__name__in=tag_list,
).values_list("object_id", flat=True)
)
qs = qs.filter(id__in=matching_ids).distinct()
else:
tag_list = []
visibility_param = self.request.GET.get("visibility", "")
if visibility_param in ("public", "shared", "private"):
qs = qs.filter(visibility=visibility_param)
self.tag_list = tag_list
self._full_queryset = qs
return qs
def _compute_overlap_groups(self, scrobbles):
parent = {}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(x, y):
rx, ry = find(x), find(y)
if rx != ry:
parent[rx] = ry
for s in scrobbles:
parent[s.pk] = s.pk
def range_for(s):
start = s.timestamp
end = s.stop_timestamp
if end is None:
try:
end = start + datetime.timedelta(hours=12)
except AttributeError:
end = start
return start, end
for i, a in enumerate(scrobbles):
a_start, a_end = range_for(a)
for b in scrobbles[i + 1 :]:
b_start, b_end = range_for(b)
if a_start <= b_end and b_start <= a_end:
union(a.pk, b.pk)
groups = {}
for s in scrobbles:
root = find(s.pk)
groups.setdefault(root, []).append(s.pk)
overlap_map = {}
color_idx = 0
COLORS = [
"border-start border-3 border-info",
"border-start border-3 border-success",
"border-start border-3 border-warning",
"border-start border-3 border-primary",
"border-start border-3 border-danger",
"border-start border-3 border-secondary",
]
for root, members in groups.items():
if len(members) >= 2:
color = COLORS[color_idx % len(COLORS)]
for pk in members:
overlap_map[pk] = color
color_idx += 1
return overlap_map
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx["tags_param"] = self.request.GET.get("tags", "")
ctx["tag_list"] = getattr(self, "tag_list", [])
scrobbles = list(ctx.get("object_list", []))
ctx["overlap_map"] = self._compute_overlap_groups(scrobbles)
full_qs = getattr(self, "_full_queryset", None)
if full_qs is not None and getattr(self, "tag_list", []):
total = (
full_qs.aggregate(total=Sum("playback_position_seconds"))["total"] or 0
)
ctx["total_time_seconds"] = total
return ctx
class ScrobbleLongPlaysView(TemplateView):
template_name = "scrobbles/long_plays_in_progress.html"
def get_context_data(self, **kwargs):
context_data = super().get_context_data(**kwargs)
context_data["view"] = self.request.GET.get("view", "grid")
context_data["in_progress"] = get_long_plays_in_progress(self.request.user)
context_data["completed"] = get_long_plays_completed(self.request.user)
return context_data
class ScrobbleImportListView(TemplateView):
template_name = "scrobbles/import_list.html"
def get_context_data(self, **kwargs):
context_data = super().get_context_data(**kwargs)
context_data["object_list"] = []
context_data["tsv_imports"] = AudioScrobblerTSVImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["koreader_imports"] = KoReaderImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["lastfm_imports"] = LastFmImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["retroarch_imports"] = RetroarchImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["bgstats_imports"] = BGStatsImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["birding_csv_imports"] = EBirdCSVImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["scale_csv_imports"] = ScaleCSVImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
context_data["trail_gpx_imports"] = TrailGPXImport.objects.filter(
user=self.request.user,
).order_by("-processing_started")[:10]
return context_data
class BaseScrobbleImportDetailView(DetailView):
slug_field = "uuid"
template_name = "scrobbles/import_detail.html"
def get_queryset(self):
return super().get_queryset().filter(user=self.request.user)
def get_context_data(self, **kwargs):
context_data = super().get_context_data(**kwargs)
title = "Generic Scrobble Import"
if self.model == KoReaderImport:
title = "KoReader Import"
if self.model == AudioScrobblerTSVImport:
title = "Audioscrobbler TSV Import"
if self.model == LastFmImport:
title = "LastFM Import"
if self.model == RetroarchImport:
title = "Retroarch Import"
if self.model == BGStatsImport:
title = "BG Stats Import"
if self.model == EBirdCSVImport:
title = "eBird CSV Import"
if self.model == ScaleCSVImport:
title = "Scale CSV Import"
if self.model == TrailGPXImport:
title = "Trail GPX Import"
if self.model == UDiscCSVImport:
title = "uDisc CSV Import"
context_data["title"] = title
return context_data
class ScrobbleKoReaderImportDetailView(BaseScrobbleImportDetailView):
model = KoReaderImport
class ScrobbleTSVImportDetailView(BaseScrobbleImportDetailView):
model = AudioScrobblerTSVImport
class ScrobbleLastFMImportDetailView(BaseScrobbleImportDetailView):
model = LastFmImport
class ScrobbleRetroarchImportDetailView(BaseScrobbleImportDetailView):
model = RetroarchImport
class ScrobbleBGStatsImportDetailView(BaseScrobbleImportDetailView):
model = BGStatsImport
class ScrobbleScaleCSVImportDetailView(BaseScrobbleImportDetailView):
model = ScaleCSVImport
class ScrobbleTrailGPXImportDetailView(BaseScrobbleImportDetailView):
model = TrailGPXImport
class ScrobbleBirdingCSVImportDetailView(BaseScrobbleImportDetailView):
model = EBirdCSVImport
class ScrobbleUDiscCSVImportDetailView(BaseScrobbleImportDetailView):
model = UDiscCSVImport
class ManualScrobbleView(FormView):
form_class = ScrobbleForm
template_name = "scrobbles/manual_form.html"
def form_valid(self, form):
item_str = form.cleaned_data.get("item_id")
logger.debug(f"Looking for scrobblable media with input {item_str}")
key, item_id = item_str[:2], item_str[3:]
scrobble_fn = MANUAL_SCROBBLE_FNS[key]
scrobble = eval(scrobble_fn)(item_id, self.request.user.id)
return HttpResponseRedirect(scrobble.redirect_url(self.request.user.id))
class JsonableResponseMixin:
"""
Mixin to add JSON support to a form.
Must be used with an object-based FormView (e.g. CreateView)
"""
def form_invalid(self, form):
response = super().form_invalid(form)
if self.request.accepts("text/html"):
return response
else:
return JsonResponse(form.errors, status=400)
def form_valid(self, form):
# We make sure to call the parent's form_valid() method because
# it might do some processing (in the case of CreateView, it will
# call form.save() for example).
response = super().form_valid(form)
if self.request.accepts("text/html"):
return response
else:
data = {
"pk": self.object.pk,
}
return JsonResponse(data)
class AudioScrobblerImportCreateView(
LoginRequiredMixin, JsonableResponseMixin, CreateView
):
model = AudioScrobblerTSVImport
fields = ["tsv_file"]
template_name = "scrobbles/upload_form.html"
success_url = reverse_lazy("vrobbler-home")
def form_valid(self, form):
self.object = form.save(commit=False)
self.object.user = self.request.user
self.object.save()
process_tsv_import.delay(self.object.id)
return HttpResponseRedirect(self.request.META.get("HTTP_REFERER"))
class KoReaderImportCreateView(LoginRequiredMixin, JsonableResponseMixin, CreateView):
model = KoReaderImport
fields = ["sqlite_file"]
template_name = "scrobbles/upload_form.html"
success_url = reverse_lazy("vrobbler-home")
def form_valid(self, form):
self.object = form.save(commit=False)
self.object.user = self.request.user
self.object.save()
process_koreader_import.delay(self.object.id)
return HttpResponseRedirect(self.request.META.get("HTTP_REFERER"))
class ScaleCSVImportCreateView(LoginRequiredMixin, JsonableResponseMixin, CreateView):
model = ScaleCSVImport
fields = ["csv_file"]
template_name = "scrobbles/upload_form.html"
success_url = reverse_lazy("vrobbler-home")
def form_valid(self, form):
self.object = form.save(commit=False)
self.object.user = self.request.user
self.object.original_filename = form.cleaned_data["csv_file"].name
self.object.save()
self.object.process()
return HttpResponseRedirect(self.request.META.get("HTTP_REFERER"))
class TrailGPXImportCreateView(LoginRequiredMixin, JsonableResponseMixin, CreateView):
model = TrailGPXImport
fields = ["gpx_file"]
template_name = "scrobbles/upload_form.html"
success_url = reverse_lazy("vrobbler-home")
def form_valid(self, form):
self.object = form.save(commit=False)
self.object.user = self.request.user
self.object.original_filename = form.cleaned_data["gpx_file"].name
self.object.save()
process_trail_gpx_import.delay(self.object.id)
return HttpResponseRedirect(self.request.META.get("HTTP_REFERER"))
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def lastfm_import(request):
lfm_import, created = LastFmImport.objects.get_or_create(
user=request.user, processed_finished__isnull=True
)
process_lastfm_import.delay(lfm_import.id)
return HttpResponseRedirect(request.META.get("HTTP_REFERER"))
class WebScrobblerWebhookView(WebhookView):
def post(self, request):
"""Note, this does not work as implemented. For some reason the web
scrobbler tool kicks back a bad response code and does not seem to POST data
as expected.
"""
user_id = request.user.id
self.logger.info(
"[web_scrobbler_webhook] called",
extra={
"post_data": request.POST,
"user_id": user_id,
},
)
youtube_id = request.POST.get("uniqueID")
if not youtube_id:
self.logger.warning(
"[web_scrobbler_webhook] failed",
extra={
"youtube_id": youtube_id,
"user_id": user_id,
},
)
return JsonResponse(
{"errors": ["No youtube ID provided"]}, status=status.HTTP_200_OK
)
playing_state = "started"
if request.POST.get("isPlaying") == "false":
playing_state = "stopped"
scrobble = web_scrobbler_scrobble_media(
youtube_id, user_id, status=playing_state
)
if not scrobble:
self.logger.warning(
"[web_scrobbler_webhook] failed",
extra={
"youtube_id": youtube_id,
"user_id": user_id,
},
)
return JsonResponse(
{"errors": ["No scrobble found for user or video"]},
status=status.HTTP_200_OK,
)
self.logger.info(
"[web_scrobbler_webhook] finished",
extra={"scrobble_id": scrobble.id},
)
return JsonResponse({"scrobble_id": scrobble.id}, status=status.HTTP_200_OK)
class JellyfinWebhookView(WebhookView):
def post(self, request):
post_data = request.data
self.logger.info(
"[jellyfin_webhook] called",
extra={
"post_data": post_data,
"user_id": request.user.id,
},
)
in_progress = post_data.get("NotificationType", "") == "PlaybackProgress"
is_music = post_data.get("ItemType", "") == "Audio"
if in_progress and is_music:
self.logger.info(
"[jellyfin_webhook] ignoring update of music in progress",
extra={"post_data": post_data},
)
return Response({}, status=status.HTTP_304_NOT_MODIFIED)
scrobble = jellyfin_scrobble_media(post_data, request.user.id)
if not scrobble:
return Response({}, status=status.HTTP_400_BAD_REQUEST)
self.logger.info(
"[jellyfin_webhook] finished",
extra={"scrobble_id": scrobble.id},
)
return Response({"scrobble_id": scrobble.id}, status=status.HTTP_200_OK)
class MopidyWebhookView(WebhookView):
def post(self, request):
try:
data_dict = json.loads(request.data)
except TypeError:
data_dict = request.data
self.logger.info(
"[mopidy_webhook] called",
extra={
"post_data": data_dict,
"user_id": request.user.id,
},
)
scrobble = mopidy_scrobble_media(data_dict, request.user.id)
if not scrobble:
return Response({}, status=status.HTTP_400_BAD_REQUEST)
return Response({"scrobble_id": scrobble.id}, status=status.HTTP_200_OK)
class GPSWebhookView(WebhookView):
def post(self, request):
try:
data_dict = json.loads(request.data)
except TypeError:
data_dict = request.data
self.logger.info(
"[geolocation_webhook] called",
extra={
"post_data": data_dict,
"user_id": request.user.id,
},
)
scrobble = gpslogger_scrobble_location(data_dict, request.user.id)
if not scrobble:
return Response({}, status=status.HTTP_200_OK)
return Response({"scrobble_id": scrobble.id}, status=status.HTTP_200_OK)
@api_view(["POST"])
@parser_classes([MultiPartParser])
@permission_classes([IsAuthenticated])
def import_audioscrobbler_file(request):
"""Takes a TSV file in the Audioscrobbler format, saves it and processes the
scrobbles.
"""
scrobbles_created = []
# tsv_file = request.FILES[0]
file_serializer = serializers.AudioScrobblerTSVImportSerializer(data=request.data)
if file_serializer.is_valid():
import_file = file_serializer.save()
return Response({"scrobble_ids": scrobbles_created}, status=status.HTTP_200_OK)
else:
return Response(file_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def scrobble_start(request, media_uuid):
logger.info(
"[scrobble_start] called",
extra={"request": request, "media_uuid": media_uuid},
)
user = request.user
success_url = request.META.get("HTTP_REFERER")
if not user.is_authenticated:
return HttpResponseRedirect(success_url)
media_obj = None
for app, model in PLAY_AGAIN_MEDIA.items():
media_model = apps.get_model(app_label=app, model_name=model)
media_obj = media_model.objects.filter(uuid=media_uuid).first()
if media_obj:
break
if not media_obj:
logger.info(
"[scrobble_start] media object not found",
extra={"media_uuid": media_uuid, "user_id": user.id},
)
raise Exception("No media object provided to scrobble")
scrobble = None
user_id = request.user.id
if media_obj:
log_data = None
if (
request.GET.get("resume")
and media_obj.__class__.__name__ == Scrobble.MediaType.BOOK
):
last_scrobble = (
Scrobble.objects.filter(
book=media_obj,
user_id=user_id,
)
.filter(
Q(long_play_complete=False) | Q(long_play_complete__isnull=True)
)
.filter(log__page_end__isnull=False)
.order_by("-timestamp")
.first()
)
if last_scrobble and last_scrobble.logdata:
next_page = last_scrobble.logdata.page_end + 1
log_data = {"page_start": next_page}
media_obj.scrobble_for_user(user_id, log=log_data)
if scrobble:
messages.add_message(
request,
messages.SUCCESS,
f"Scrobble of {scrobble.media_obj} started.",
)
else:
messages.add_message(
request, messages.ERROR, f"Media with uuid {media_uuid} not found."
)
if (
user.profile.redirect_to_webpage
and (
media_obj.__class__.__name__ == Scrobble.MediaType.WEBPAGE
or media_obj.__class__.__name__ == Scrobble.MediaType.BOOK
)
and hasattr(media_obj, "url")
):
logger.info(f"Redirecting to {media_obj} detail page")
return HttpResponseRedirect(media_obj.url)
return HttpResponseRedirect(success_url)
@api_view(["GET"])
def scrobble_longplay_finish(request, media_uuid):
user = request.user
success_url = request.META.get("HTTP_REFERER")
if not user.is_authenticated:
return HttpResponseRedirect(success_url)
# Try scrobble UUID first
scrobble = Scrobble.objects.filter(uuid=media_uuid, user=user).first()
if scrobble:
if scrobble.long_play_complete == True:
scrobble.long_play_complete = None
scrobble.save(update_fields=["long_play_complete"])
messages.add_message(
request,
messages.INFO,
f"Long play of {scrobble.media_obj} marked as not complete.",
)
else:
scrobble.long_play_complete = True
scrobble.save(update_fields=["long_play_complete"])
messages.add_message(
request,
messages.SUCCESS,
f"Long play of {scrobble.media_obj} finished.",
)
return HttpResponseRedirect(success_url)
# Fall back to media UUID (existing behavior)
media_obj = None
for app, model in LONG_PLAY_MEDIA.items():
media_model = apps.get_model(app_label=app, model_name=model)
media_obj = media_model.objects.filter(uuid=media_uuid).first()
if media_obj:
break
if not media_obj:
messages.add_message(
request, messages.ERROR, f"Media with uuid {media_uuid} not found."
)
return HttpResponseRedirect(success_url)
last_scrobble = media_obj.last_long_play_scrobble_for_user(user)
if last_scrobble and last_scrobble.long_play_complete == True:
last_scrobble.long_play_complete = None
last_scrobble.save(update_fields=["long_play_complete"])
messages.add_message(
request,
messages.INFO,
f"Long play of {media_obj} marked as not complete.",
)
elif last_scrobble:
last_scrobble.long_play_complete = True
last_scrobble.save(update_fields=["long_play_complete"])
messages.add_message(
request,
messages.SUCCESS,
f"Long play of {media_obj} finished.",
)
else:
messages.add_message(
request, messages.ERROR, f"Media with uuid {media_uuid} not found."
)
return HttpResponseRedirect(success_url)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def scrobble_finish(request, pk):
user = request.user
success_url = request.META.get("HTTP_REFERER")
if not success_url:
success_url = reverse_lazy("vrobbler-home")
if not user.is_authenticated:
return HttpResponseRedirect(success_url)
scrobble = Scrobble.objects.filter(user=user, pk=pk).first()
if scrobble:
scrobble.stop(force_finish=True)
messages.add_message(
request,
messages.SUCCESS,
f"Scrobble of {scrobble.media_obj} finished.",
)
else:
messages.add_message(request, messages.ERROR, "Scrobble not found.")
return HttpResponseRedirect(success_url)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def scrobble_cancel(request, pk):
user = request.user
success_url = reverse_lazy("vrobbler-home")
if not user.is_authenticated:
return HttpResponseRedirect(success_url)
scrobble = Scrobble.objects.filter(user=user, pk=pk).first()
if scrobble:
scrobble.cancel()
messages.add_message(
request,
messages.SUCCESS,
f"Scrobble of {scrobble.media_obj} cancelled.",
)
else:
messages.add_message(request, messages.ERROR, "Scrobble not found.")
return HttpResponseRedirect(request.META.get("HTTP_REFERER"))
@require_POST
def add_to_mopidy_queue(request, pk):
if not request.user.is_authenticated:
return redirect("scrobbles:detail", pk=pk)
scrobble = get_object_or_404(Scrobble, pk=pk, user=request.user)
mopidy_url = request.user.profile.mopidy_api_url
if not mopidy_url:
messages.add_message(
request,
messages.ERROR,
"Mopidy API URL not configured in your profile settings.",
)
return redirect("scrobbles:detail", pk=pk)
from scrobbles.tasks import add_scrobble_to_mopidy_queue as task
task.delay(scrobble.id)
msg = f'Adding "{scrobble.media_obj}" to Mopidy queue.'
messages.add_message(request, messages.SUCCESS, msg)
return redirect("scrobbles:detail", pk=pk)
@require_POST
def add_to_mopidy_monthly_playlist(request, pk):
if not request.user.is_authenticated:
return redirect("scrobbles:detail", pk=pk)
scrobble = get_object_or_404(Scrobble, pk=pk, user=request.user)
profile = request.user.profile
pattern = profile.monthly_mopidy_playlist_pattern
if not pattern or not profile.mopidy_api_url:
messages.add_message(
request,
messages.ERROR,
"Monthly playlist pattern or Mopidy API URL not configured in your profile.",
)
return redirect("scrobbles:detail", pk=pk)
now = now_user_timezone(profile)
playlist_name = DateFormat(now).format(pattern)
from scrobbles.tasks import add_scrobble_to_mopidy_monthly_playlist as task
task.delay(scrobble.id)
messages.add_message(
request,
messages.SUCCESS,
f'Adding "{scrobble.media_obj}" to monthly playlist "{playlist_name}".',
)
return redirect("scrobbles:detail", pk=pk)
@require_POST
def toggle_favorite(request, media_type, object_id):
if not request.user.is_authenticated:
return HttpResponseRedirect(request.META.get("HTTP_REFERER", "/"))
app_model_map = {
"Video": ("videos", "Video"),
"Channel": ("videos", "Channel"),
"Track": ("music", "Track"),
"PodcastEpisode": ("podcasts", "PodcastEpisode"),
"SportEvent": ("sports", "SportEvent"),
"Book": ("books", "Book"),
"Paper": ("books", "Paper"),
"VideoGame": ("videogames", "VideoGame"),
"BoardGame": ("boardgames", "BoardGame"),
"GeoLocation": ("locations", "GeoLocation"),
"Beer": ("beers", "Beer"),
"Puzzle": ("puzzles", "Puzzle"),
"Food": ("foods", "Food"),
"Trail": ("trails", "Trail"),
"Task": ("tasks", "Task"),
"WebPage": ("webpages", "WebPage"),
"LifeEvent": ("lifeevents", "LifeEvent"),
"Mood": ("moods", "Mood"),
"BrickSet": ("bricksets", "BrickSet"),
"BirdingLocation": ("birds", "BirdingLocation"),
"DiscGolfCourse": ("discgolf", "DiscGolfCourse"),
}
app_label, model_name = app_model_map.get(media_type, (None, None))
if not app_label:
messages.add_message(
request, messages.ERROR, f"Unknown media type: {media_type}"
)
return HttpResponseRedirect(request.META.get("HTTP_REFERER", "/"))
model = apps.get_model(app_label, model_name)
media_obj = get_object_or_404(model, id=object_id)
result = FavoriteMedia.toggle(media_obj, request.user)
is_favorited = result is not None
if not is_favorited:
msg = f'Removed "{media_obj}" from favorites.'
else:
msg = f'Added "{media_obj}" to favorites.'
if request.headers.get("x-requested-with") == "XMLHttpRequest":
return JsonResponse({"is_favorited": is_favorited, "message": msg})
messages.add_message(request, messages.SUCCESS, msg)
return HttpResponseRedirect(request.META.get("HTTP_REFERER", "/"))
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def export(request):
format = request.GET.get("export_type", "csv")
start = request.GET.get("start")
end = request.GET.get("end")
logger.debug(f"Exporting all scrobbles in format {format}")
temp_file, extension = export_scrobbles(
start_date=start, end_date=end, format=format
)
now = datetime.now()
filename = f"vrobbler-export-{str(now)}.{extension}"
response = FileResponse(open(temp_file, "rb"))
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
class ScrobbleStatusView(LoginRequiredMixin, TemplateView):
model = Scrobble
template_name = "scrobbles/status.html"
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
user_scrobble_qs = Scrobble.objects.filter().order_by("-timestamp")
data["listening"] = progress_plays.filter(
Q(track__isnull=False) | Q(podcast_episode__isnull=False)
).first()
data["watching"] = progress_plays.filter(video__isnull=False).first()
data["going"] = progress_plays.filter(geo_location__isnull=False).first()
data["playing"] = progress_plays.filter(board_game__isnull=False).first()
data["sporting"] = progress_plays.filter(sport_event__isnull=False).first()
data["browsing"] = progress_plays.filter(web_page__isnull=False).first()
data["participating"] = progress_plays.filter(life_event__isnull=False).first()
data["working"] = progress_plays.filter(task__isnull=False).first()
long_plays = user_scrobble_qs.filter(
long_play_complete=False, played_to_completion=True
)
data["reading"] = long_plays.filter(book__isnull=False).first()
data["sessioning"] = long_plays.filter(video_game__isnull=False).first()
return data
class ScrobbleDetailView(DetailView):
model = Scrobble
paginate_by = 100
def get_object(self, queryset=None):
scrobble = super().get_object(queryset=queryset)
user = self.request.user
if scrobble.visibility == Visibility.PUBLIC:
return scrobble
if user.is_authenticated and scrobble.user == user:
return scrobble
raise Http404
def get_form_class(self):
return self.object.media_obj.logdata_cls().form()
def get_form(self):
FormClass = self.get_form_class()
log = self.object.log or {}
notes = log.get("notes")
if isinstance(notes, dict):
log["notes"] = notes
else:
log["notes"] = self.object.logdata.notes_as_str(separator="\n")
return FormClass(initial=log)
def post(self, request, *args, **kwargs):
self.object = self.get_object()
FormClass = self.get_form_class()
form = FormClass(request.POST)
if form.is_valid():
data = form.cleaned_data.copy()
for field_name, field in form.fields.items():
if field.disabled:
original_value = (self.object.log or {}).get(field_name)
data[field_name] = original_value
if data.get("with_people_ids") is not None:
data["with_people_ids"] = [p.id for p in data["with_people_ids"]]
if data.get("mood_reason_ids") is not None:
data["mood_reason_ids"] = [r.id for r in data["mood_reason_ids"]]
if data.get("platform_id", False):
data["platform_id"] = data["platform_id"].id
if data.get("location_id", False):
data["location_id"] = data["location_id"].id
self.object.log = data
self.object.save(update_fields=["log"])
return redirect(self.object.get_absolute_url())
context = self.get_context_data(log_form=form)
return self.render_to_response(context)
MEDIA_FK_MAP = {
"Video": "video",
"Track": "track",
"PodcastEpisode": "podcast_episode",
"Book": "book",
"Paper": "paper",
"VideoGame": "video_game",
"BoardGame": "board_game",
"Beer": "beer",
"Food": "food",
"Puzzle": "puzzle",
"Trail": "trail",
"GeoLocation": "geo_location",
"Task": "task",
"WebPage": "web_page",
"LifeEvent": "life_event",
"Mood": "mood",
"BrickSet": "brick_set",
"BirdingLocation": "birding_location",
}
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if "log_form" not in context:
context["log_form"] = self.get_form()
media_obj = self.object.media_obj
user = self.object.user
media_type = self.object.media_type
fk_field = self.MEDIA_FK_MAP.get(media_type)
if fk_field:
related_scrobbles = (
Scrobble.objects.filter(user=user)
.filter(**{fk_field: media_obj})
.order_by("-timestamp")
)
else:
related_scrobbles = Scrobble.objects.none()
page = self.request.GET.get("page")
paginator = Paginator(related_scrobbles, self.paginate_by)
try:
context["related_scrobbles"] = paginator.page(page)
except PageNotAnInteger:
context["related_scrobbles"] = paginator.page(1)
except EmptyPage:
context["related_scrobbles"] = paginator.page(paginator.num_pages)
if self.request.user.is_authenticated:
fk_field = self.MEDIA_FK_MAP.get(media_type)
if fk_field and media_obj:
context["is_favorited"] = FavoriteMedia.objects.filter(
user=self.request.user, **{fk_field: media_obj}
).exists()
if media_type == "Track" and media_obj:
scrobbles = Scrobble.objects.filter(
track=media_obj, user=self.object.user
).order_by("-timestamp")[:20]
context["has_mopidy_uri"] = any(
(s.log or {}).get("raw_data", {}).get("mopidy_uri") for s in scrobbles
)
else:
context["has_mopidy_uri"] = False
if self.object.is_long_play and fk_field:
all_scrobbles = Scrobble.objects.filter(
user=user, **{fk_field: media_obj}
)
completed = all_scrobbles.filter(
long_play_complete=True
).order_by("-timestamp").first()
if completed and completed.long_play_seconds:
context["long_play_total_seconds"] = completed.long_play_seconds
context["long_play_finished_date"] = completed.timestamp
else:
latest_finished = all_scrobbles.filter(
played_to_completion=True
).order_by("-timestamp").first()
if latest_finished and latest_finished.long_play_seconds:
context["long_play_total_seconds"] = latest_finished.long_play_seconds
context["long_play_finished_date"] = None
return context
class ScrobbleShareView(TemplateView):
template_name = "scrobbles/scrobble_share.html"
def get_object(self):
sqid = self.kwargs.get("sqid")
decoded = decode_scrobble_share(sqid)
if not decoded or len(decoded) != 2:
raise Http404
scrobble_id, version = decoded
scrobble = get_object_or_404(Scrobble, id=scrobble_id)
if scrobble.share_token_version != version:
raise Http404
if scrobble.visibility not in (Visibility.PUBLIC, Visibility.SHARED):
raise Http404
Scrobble.objects.filter(id=scrobble.id).update(
share_view_count=F("share_view_count") + 1
)
scrobble.refresh_from_db(fields=["share_view_count"])
ShareViewLog.objects.create(
scrobble=scrobble,
ip_address=self.request.META.get("REMOTE_ADDR"),
user_agent=self.request.META.get("HTTP_USER_AGENT", "")[:500],
referrer=self.request.META.get("HTTP_REFERER", ""),
)
return scrobble
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
scrobble = self.get_object()
context["object"] = scrobble
context["log_form"] = None
context["related_scrobbles"] = Scrobble.objects.none()
context["has_mopidy_uri"] = False
if self.request.user.is_authenticated:
media_type = scrobble.media_type
fk_field = ScrobbleDetailView.MEDIA_FK_MAP.get(media_type)
media_obj = scrobble.media_obj
if fk_field and media_obj:
context["is_favorited"] = FavoriteMedia.objects.filter(
user=self.request.user, **{fk_field: media_obj}
).exists()
return context
class ScrobbleExploreView(ListView):
model = Scrobble
paginate_by = 100
template_name = "scrobbles/scrobble_explore.html"
queryset = Scrobble.objects.filter(visibility=Visibility.PUBLIC).order_by(
"-timestamp"
)
class RegenerateShareTokenView(LoginRequiredMixin, View):
def post(self, request, pk):
scrobble = get_object_or_404(Scrobble, pk=pk, user=request.user)
scrobble.regenerate_share_token()
return redirect(scrobble.get_absolute_url())
class ChangeVisibilityView(LoginRequiredMixin, View):
def post(self, request, pk):
scrobble = get_object_or_404(Scrobble, pk=pk, user=request.user)
visibility = request.POST.get("visibility")
if visibility not in (Visibility.PUBLIC, Visibility.SHARED, Visibility.PRIVATE):
return redirect(scrobble.get_absolute_url())
scrobble.visibility = visibility
scrobble.save(update_fields=["visibility"])
return redirect(scrobble.get_absolute_url())
class ScrobbleShareAnalyticsView(LoginRequiredMixin, DetailView):
model = Scrobble
template_name = "scrobbles/scrobble_share_analytics.html"
def get_queryset(self):
return Scrobble.objects.filter(user=self.request.user)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
scrobble = self.object
context["share_views"] = scrobble.share_views.order_by("-created")[:50]
return context
class BaseEmbeddableWidget(TemplateView):
template_name = "scrobbles/embeddable_top_media.html"
media_type = "Media"
count_label = "plays"
no_data_message = "No scrobbles"
model = None
scrobble_filter = {}
def get_user_and_profile(self):
user_id = self.kwargs.get("user_id")
user = User.objects.get(id=user_id)
profile = user.profile
if not profile.enable_public_widgets:
raise Http404("Public widgets are not enabled for this user")
return user, profile
def get_date_range(self, user):
date_param = self.request.GET.get("date")
if date_param:
try:
parsed_date = datetime.strptime(date_param, "%Y-%m-%d").date()
except ValueError:
parsed_date = None
if parsed_date:
now = parsed_date
else:
now = timezone.now().date()
else:
now = timezone.now()
if user.is_authenticated:
now = now_user_timezone(user.profile)
now = now.date()
period = self.kwargs.get("period", "week")
if period == "month":
start = now.replace(day=1)
if now.month == 12:
end = now.replace(year=now.year + 1, month=1, day=1)
else:
end = now.replace(month=now.month + 1, day=1)
label = now.strftime("%B %Y")
elif period == "year":
start = now.replace(month=1, day=1)
end = now.replace(year=now.year + 1, month=1, day=1)
label = now.strftime("%Y")
else:
start = now - timedelta(days=now.isoweekday() % 7)
end = start + timedelta(days=7)
label = f"Week {now.isocalendar()[1]}"
return start, end, label
def get_items(self, user, start_date, end_date, model=None):
from django.db.models import Count
model = model or self.model
queryset = (
model.objects.filter(
scrobble__user_id=user.id,
scrobble__timestamp__gte=start_date,
scrobble__timestamp__lt=end_date,
**self.scrobble_filter,
)
.annotate(count=Count("scrobble", distinct=True))
.order_by("-count")[:10]
)
items = list(queryset)
for item in items:
if not hasattr(item, "name") and hasattr(item, "title"):
item.name = item.title
return items
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
user, profile = self.get_user_and_profile()
context["user"] = user
context["custom_css"] = profile.widget_custom_css
context["media_type"] = self.media_type
context["count_label"] = self.count_label
start_date, end_date, period_label = self.get_date_range(user)
context["period_label"] = period_label
items = self.get_items(user, start_date, end_date)
context["items"] = items
context["no_data_message"] = self.no_data_message
return context
class EmbeddableTopArtistsWidget(TemplateView):
template_name = "scrobbles/embeddable_top_media.html"
media_type = "Artists"
count_label = "scrobbles"
no_data_message = "No scrobbles"
def get_user_and_profile(self):
user_id = self.kwargs.get("user_id")
user = User.objects.get(id=user_id)
profile = user.profile
if not profile.enable_public_widgets:
raise Http404("Public widgets are not enabled for this user")
return user, profile
def get_date_range(self, user):
date_param = self.request.GET.get("date")
if date_param:
try:
parsed_date = datetime.strptime(date_param, "%Y-%m-%d").date()
except ValueError:
parsed_date = None
if parsed_date:
now = datetime.combine(parsed_date, datetime.min.time())
now = timezone.make_aware(now)
else:
now = timezone.now()
else:
now = timezone.now()
if user.is_authenticated:
now = now_user_timezone(user.profile)
period = self.kwargs.get("period", "week")
if period == "month":
label = now.strftime("%B %Y")
elif period == "year":
label = now.strftime("%Y")
else:
label = f"Week {now.isocalendar()[1]}"
return now, label
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
user, profile = self.get_user_and_profile()
context["user"] = user
context["custom_css"] = profile.widget_custom_css
context["media_type"] = self.media_type
context["count_label"] = self.count_label
context["no_data_message"] = self.no_data_message
now, period_label = self.get_date_range(user)
period = self.kwargs.get("period", "week")
period_map = {"week": "last7", "month": "last30", "year": "year"}
chart_period = period_map.get(period, "last7")
artist = {"user": user, "media_type": "Artist", "limit": 10, "as_of": now}
items = list(live_charts(**artist, chart_period=chart_period))
for item in items:
item.count = item.num_scrobbles
context["items"] = items
context["period_label"] = period_label
return context
class ScrobbleCalendarView(LoginRequiredMixin, TemplateView):
template_name = "scrobbles/calendar.html"
CALENDAR_MEDIA_TYPES = [
"Task",
"BirdingLocation",
"Food",
"Trail",
"VideoGame",
"Book",
"Mood",
"Video",
"BoardGame",
]
MEDIA_EMOJI = {
"Task": "",
"BirdingLocation": "🐦",
"Food": "🍽️",
"Trail": "🥾",
"VideoGame": "🎮",
"Book": "📚",
"Mood": "🥰",
"Video": "🎬",
"BoardGame": "🎲",
}
DEFAULT_EXCLUDE = ["Task", "Mood", "Video", "Food", "BoardGame"]
MONTH_COLORS = [
"#db7a7a", # Jan
"#db847a", # Feb
"#b0db7a", # Mar
"#7adb82", # Apr
"#7adbb3", # May
"#7ab6db", # Jun
"#7a8edb", # Jul
"#977adb", # Aug
"#c47adb", # Sep
"#db7ac5", # Oct
"#db7a90", # Nov
"#db7a7a", # Dec
]
def _day_color(self, month_index, day_num, total_days):
import colorsys
hue = (month_index - 1) * 30 / 360
lightness = 0.80 + (day_num / total_days) * 0.15
r, g, b = colorsys.hls_to_rgb(hue, lightness, 0.5)
return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}"
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
today = timezone.localdate()
date_param = self.request.GET.get("date", today.isoformat()[:7])
try:
year, month = [int(x) for x in date_param.split("-")]
except (ValueError, IndexError):
year, month = today.year, today.month
_, total_days = calendar.monthrange(year, month)
month_start = datetime(year, month, 1).date()
month_end = datetime(year, month, total_days).date()
first_weekday = month_start.weekday()
prev_month = month_start - timedelta(days=1)
next_month = month_end + timedelta(days=1)
media_type_filter = self.request.GET.get("media_type")
if media_type_filter and media_type_filter in self.CALENDAR_MEDIA_TYPES:
active_types = [media_type_filter]
else:
active_types = [
t for t in self.CALENDAR_MEDIA_TYPES if t not in self.DEFAULT_EXCLUDE
]
scrobbles = (
Scrobble.objects.filter(
user=self.request.user,
timestamp__date__gte=month_start,
timestamp__date__lte=month_end,
media_type__in=active_types,
)
.select_related(
"task",
"birding_location",
"food",
"trail",
"video_game",
"book",
"mood",
"video",
"board_game",
)
.order_by("timestamp")
)
from django.db.models import Count, Q
from django.db.models.functions import TruncDate
total_by_day = dict(
Scrobble.objects.filter(
user=self.request.user,
timestamp__date__gte=month_start,
timestamp__date__lte=month_end,
)
.exclude(Q(media_type="GeoLocation") & Q(geo_location__title__isnull=True))
.annotate(
local_date=TruncDate(
"timestamp", tzinfo=timezone.get_current_timezone()
)
)
.values("local_date")
.annotate(count=Count("id"))
.values_list("local_date", "count")
)
day_map = {d: [] for d in range(1, total_days + 1)}
for scrobble in scrobbles:
local_ts = timezone.localtime(scrobble.timestamp)
day_map[local_ts.day].append(scrobble)
missing_uuids = [s for s in scrobbles if not s.uuid]
if missing_uuids:
for scrobble in missing_uuids:
scrobble.uuid = uuid4()
Scrobble.objects.bulk_update(missing_uuids, ["uuid"])
calendar_days = []
month_color = self.MONTH_COLORS[(month - 1) % 12]
for day_num in range(1, total_days + 1):
day_scrobbles = []
for scrobble in day_map[day_num]:
day_scrobbles.append(
{
"id": scrobble.pk,
"uuid": scrobble.uuid,
"emoji": self.MEDIA_EMOJI.get(scrobble.media_type, "📌"),
"title": (
str(scrobble.media_obj)
if scrobble.media_obj
else scrobble.media_type
),
"media_type": scrobble.media_type,
}
)
calendar_days.append(
{
"day": day_num,
"scrobbles": day_scrobbles,
"total_count": total_by_day.get(
datetime(year, month, day_num).date(), 0
),
"is_today": year == today.year
and month == today.month
and day_num == today.day,
"color": self._day_color(month, day_num, total_days),
}
)
ctx.update(
{
"year": year,
"month": month,
"month_name": month_start.strftime("%B"),
"total_days": total_days,
"first_weekday": first_weekday,
"calendar_days": calendar_days,
"prev_month": prev_month.isoformat()[:7],
"next_month": next_month.isoformat()[:7],
"today": today,
"current_month": today.isoformat()[:7],
"day_names": ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"],
"month_color": month_color,
"active_filter": media_type_filter or "",
"media_types": [
{"name": mt, "emoji": self.MEDIA_EMOJI.get(mt, "📌")}
for mt in self.CALENDAR_MEDIA_TYPES
],
}
)
return ctx
class ScrobbleSearchView(LoginRequiredMixin, TemplateView):
template_name = "scrobbles/scrobble_search.html"
MEDIA_FIELDS = {
"Video": ["video__title", "video__overview"],
"Track": ["track__title", "track__artists__name", "track__album__name"],
"PodcastEpisode": ["podcast_episode__title", None],
"Book": ["book__title", "book__summary"],
"Paper": ["paper__title", None],
"VideoGame": ["video_game__title", "video_game__summary"],
"BoardGame": ["board_game__title", "board_game__description"],
"Beer": ["beer__name", "beer__style"],
"Food": ["food__name", "food__description"],
"Puzzle": ["puzzle__name", "puzzle__description"],
"Trail": ["trail__name", "trail__location"],
"GeoLocation": ["geo_location__title", None],
"Task": ["task__title", "task__description"],
"WebPage": ["web_page__title", None],
"LifeEvent": ["life_event__title", "life_event__description"],
"Mood": ["mood__title", "mood__description"],
"BrickSet": ["brick_set__title", None],
"BirdingLocation": ["birding_location__title", "birding_location__description"],
}
def get(self, request, *args, **kwargs):
if not request.GET.get("q") and not request.GET.get("media_type"):
return super().get(request, *args, **kwargs)
return self.search(request)
def search(self, request):
query = request.GET.get("q", "").strip()
media_type = request.GET.get("media_type", "")
user = request.user
scrobbles = Scrobble.objects.filter(user=user)
if media_type:
scrobbles = scrobbles.filter(media_type=media_type)
if query:
from django.contrib.contenttypes.models import ContentType
from taggit.models import TaggedItem
scrobble_ct = ContentType.objects.get_for_model(Scrobble)
tag_match_ids = list(
TaggedItem.objects.filter(
content_type=scrobble_ct,
tag__name__icontains=query,
).values_list("object_id", flat=True)
)
q = Q()
q |= Q(log__notes__icontains=query)
q |= Q(log__title__icontains=query)
q |= Q(log__description__icontains=query)
q |= Q(source__icontains=query)
if media_type and media_type in self.MEDIA_FIELDS:
fields = self.MEDIA_FIELDS[media_type]
for field in fields:
if field:
q |= Q(**{f"{field}__icontains": query})
scrobbles = scrobbles.filter(q).distinct()
else:
all_matching_ids = set()
base_matches = scrobbles.filter(q).values_list("id", flat=True)
all_matching_ids.update(base_matches)
for mt, fields in self.MEDIA_FIELDS.items():
mt_queries = []
for field in fields:
if field:
mt_queries.append(Q(**{f"{field}__icontains": query}))
if mt_queries:
mt_q = mt_queries[0]
for mq in mt_queries[1:]:
mt_q |= mq
try:
mt_matches = scrobbles.filter(mt_q).values_list(
"id", flat=True
)
all_matching_ids.update(mt_matches)
except Exception:
pass
all_matching_ids.update(tag_match_ids)
scrobbles = Scrobble.objects.filter(id__in=all_matching_ids, user=user)
scrobbles = scrobbles.order_by("-timestamp")[:100]
context = self.get_context_data(
scrobbles=scrobbles,
query=query,
media_type=media_type,
media_types=Scrobble.MediaType.choices,
)
return self.render_to_response(context)
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
if "scrobbles" not in ctx:
ctx["scrobbles"] = []
if "query" not in ctx:
ctx["query"] = ""
if "media_type" not in ctx:
ctx["media_type"] = ""
if "media_types" not in ctx:
ctx["media_types"] = Scrobble.MediaType.choices
return ctx
class EmbeddableTopBoardGamesWidget(BaseEmbeddableWidget):
media_type = "Board Games"
count_label = "plays"
no_data_message = "No board games played"
scrobble_filter = {"scrobble__played_to_completion": True}
def get_items(self, user, start_date, end_date):
from boardgames.models import BoardGame
return super().get_items(user, start_date, end_date, BoardGame)
class EmbeddableTopBooksWidget(BaseEmbeddableWidget):
media_type = "Books"
count_label = "reads"
no_data_message = "No books read"
def get_items(self, user, start_date, end_date):
from books.models import Book
from django.db.models import Count, Exists, OuterRef
completed_subquery = Book.objects.filter(
scrobble__user_id=user.id,
scrobble__timestamp__gte=start_date,
scrobble__timestamp__lt=end_date,
scrobble__long_play_complete=True,
pk=OuterRef("pk"),
)
queryset = (
Book.objects.filter(
scrobble__user_id=user.id,
scrobble__timestamp__gte=start_date,
scrobble__timestamp__lt=end_date,
)
.annotate(
count=Count("scrobble", distinct=True),
is_completed=Exists(completed_subquery),
)
.order_by("-count")[:10]
)
items = list(queryset)
for item in items:
if not hasattr(item, "name") and hasattr(item, "title"):
item.name = item.title
return items
class EmbeddableTopTrailsWidget(BaseEmbeddableWidget):
media_type = "Trails"
count_label = "visits"
no_data_message = "No trails visited"
def get_items(self, user, start_date, end_date):
from trails.models import Trail
return super().get_items(user, start_date, end_date, Trail)
class EmbeddableTopFoodsWidget(BaseEmbeddableWidget):
media_type = "Foods"
count_label = "meals"
no_data_message = "No foods logged"
def get_items(self, user, start_date, end_date):
from foods.models import Food
return super().get_items(user, start_date, end_date, Food)
class EmbeddableTopTasksWidget(BaseEmbeddableWidget):
media_type = "Tasks"
count_label = "sessions"
no_data_message = "No tasks logged"
def get_items(self, user, start_date, end_date):
from tasks.models import Task
return super().get_items(user, start_date, end_date, Task)