303 lines
9.4 KiB
Python
303 lines
9.4 KiB
Python
import logging
|
|
from dataclasses import dataclass
|
|
from functools import cached_property
|
|
from typing import Optional
|
|
from uuid import uuid4
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.files import File
|
|
from django.db import models
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django_extensions.db.models import TimeStampedModel
|
|
from imagekit.models import ImageSpecField
|
|
from imagekit.processors import ResizeToFit
|
|
from locations.models import GeoLocation
|
|
from scrobbles.dataclasses import BaseLogData, WithPeopleLogData
|
|
from scrobbles.mixins import ScrobblableConstants, ScrobblableMixin
|
|
|
|
User = get_user_model()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
BNULL = {"blank": True, "null": True}
|
|
|
|
|
|
@dataclass
|
|
class BirdSightingEntry(BaseLogData):
|
|
bird_id: Optional[int] = None
|
|
quantity: int = 1
|
|
sighting_notes: Optional[str] = None
|
|
|
|
@property
|
|
def bird(self) -> Optional["Bird"]:
|
|
if not self.bird_id:
|
|
return None
|
|
return Bird.objects.filter(id=self.bird_id).first()
|
|
|
|
def __str__(self) -> str:
|
|
name = self.bird.common_name if self.bird else "Unknown"
|
|
out = f"{name} x{self.quantity}"
|
|
if self.sighting_notes:
|
|
out += f" ({self.sighting_notes})"
|
|
return out
|
|
|
|
|
|
@dataclass
|
|
class BirdSightingLogData(BaseLogData, WithPeopleLogData):
|
|
birds: Optional[list[BirdSightingEntry]] = None
|
|
duration_minutes: Optional[int] = None
|
|
observation_type: Optional[str] = None
|
|
distance: Optional[str] = None
|
|
area: Optional[str] = None
|
|
party_size: Optional[int] = None
|
|
complete_checklist: Optional[bool] = None
|
|
weather: Optional[str] = None
|
|
temperature: Optional[int] = None
|
|
guide: Optional[str] = None
|
|
|
|
_excluded_fields = {}
|
|
|
|
@cached_property
|
|
def bird_list(self) -> str:
|
|
if self.birds:
|
|
return ", ".join(
|
|
[BirdSightingEntry(**b).__str__() for b in self.birds]
|
|
)
|
|
return ""
|
|
|
|
def as_html(self) -> str:
|
|
html_parts = []
|
|
|
|
if self.observation_type:
|
|
html_parts.append(
|
|
f'<div class="birding-obs-type">Type: {self.observation_type}</div>'
|
|
)
|
|
|
|
if self.distance:
|
|
html_parts.append(
|
|
f'<div class="birding-distance">Distance: {self.distance}</div>'
|
|
)
|
|
|
|
if self.area:
|
|
html_parts.append(
|
|
f'<div class="birding-area">Area: {self.area}</div>'
|
|
)
|
|
|
|
if self.party_size:
|
|
html_parts.append(
|
|
f'<div class="birding-party">Party size: {self.party_size}</div>'
|
|
)
|
|
|
|
if self.complete_checklist is not None:
|
|
html_parts.append(
|
|
f'<div class="birding-checklist">Complete checklist: {self.complete_checklist}</div>'
|
|
)
|
|
|
|
if self.weather:
|
|
html_parts.append(
|
|
f'<div class="birding-weather">Weather: {self.weather}</div>'
|
|
)
|
|
|
|
if self.temperature:
|
|
html_parts.append(
|
|
f'<div class="birding-temp">Temp: {self.temperature}°</div>'
|
|
)
|
|
|
|
if self.guide:
|
|
html_parts.append(
|
|
f'<div class="birding-guide">Guide: {self.guide}</div>'
|
|
)
|
|
|
|
if self.duration_minutes:
|
|
html_parts.append(
|
|
f'<div class="birding-duration">Duration: {self.duration_minutes} min</div>'
|
|
)
|
|
|
|
if self.birds:
|
|
birds_html = []
|
|
for bird_data in self.birds:
|
|
sighting = BirdSightingEntry(**bird_data)
|
|
bird_info = sighting.bird.common_name if sighting.bird else "Unknown"
|
|
extra = f" x{sighting.quantity}"
|
|
if sighting.sighting_notes:
|
|
extra += f" \u2014 {sighting.sighting_notes}"
|
|
birds_html.append(
|
|
f'<div class="bird-sighting">{bird_info}{extra}</div>'
|
|
)
|
|
html_parts.append(
|
|
f'<div class="bird-sightings">{"".join(birds_html)}</div>'
|
|
)
|
|
|
|
return "".join(html_parts)
|
|
|
|
@classmethod
|
|
def override_fields(cls) -> dict:
|
|
from birds.forms import BirdSightingsField
|
|
|
|
fields = {}
|
|
for base in cls.mro()[1:]:
|
|
if hasattr(base, "override_fields"):
|
|
base_fields = base.override_fields()
|
|
fields.update(base_fields)
|
|
custom_fields = {
|
|
"birds": BirdSightingsField(required=False),
|
|
}
|
|
fields.update(custom_fields)
|
|
return fields
|
|
|
|
|
|
class Bird(TimeStampedModel):
|
|
uuid = models.UUIDField(default=uuid4, editable=False, **BNULL)
|
|
common_name = models.CharField(max_length=255)
|
|
scientific_name = models.CharField(max_length=255, **BNULL)
|
|
description = models.TextField(**BNULL)
|
|
ebird_code = models.CharField(max_length=255, **BNULL, db_index=True)
|
|
photo = models.ImageField(upload_to="birds/photos/", **BNULL)
|
|
photo_small = ImageSpecField(
|
|
source="photo",
|
|
processors=[ResizeToFit(100, 100)],
|
|
format="JPEG",
|
|
options={"quality": 60},
|
|
)
|
|
photo_medium = ImageSpecField(
|
|
source="photo",
|
|
processors=[ResizeToFit(300, 300)],
|
|
format="JPEG",
|
|
options={"quality": 75},
|
|
)
|
|
|
|
def __str__(self):
|
|
return self.common_name
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("birds:bird_detail", kwargs={"slug": self.uuid})
|
|
|
|
@classmethod
|
|
def find_or_create(cls, common_name: str) -> "Bird":
|
|
bird = cls.objects.filter(common_name__iexact=common_name).first()
|
|
if not bird:
|
|
bird = cls.objects.create(common_name=common_name)
|
|
return bird
|
|
|
|
|
|
class BirdingLocation(ScrobblableMixin):
|
|
description = models.TextField(**BNULL)
|
|
geo_location = models.ForeignKey(
|
|
GeoLocation, **BNULL, on_delete=models.DO_NOTHING
|
|
)
|
|
ebird_hotspot_id = models.CharField(max_length=255, **BNULL)
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("birds:birding_location_detail", kwargs={"slug": self.uuid})
|
|
|
|
@property
|
|
def subtitle(self):
|
|
return ""
|
|
|
|
@property
|
|
def strings(self) -> ScrobblableConstants:
|
|
return ScrobblableConstants(verb="Birding at", tags="bird")
|
|
|
|
@property
|
|
def logdata_cls(self):
|
|
return BirdSightingLogData
|
|
|
|
def primary_image_url(self) -> str:
|
|
return ""
|
|
|
|
def fix_metadata(self) -> None:
|
|
pass
|
|
|
|
@classmethod
|
|
def find_or_create(cls, title: str) -> "BirdingLocation":
|
|
location = cls.objects.filter(title__iexact=title).first()
|
|
if not location:
|
|
location = cls.objects.create(title=title)
|
|
return location
|
|
|
|
|
|
class BirdingCSVImport(TimeStampedModel):
|
|
user = models.ForeignKey(User, on_delete=models.DO_NOTHING, **BNULL)
|
|
uuid = models.UUIDField(editable=False, default=uuid4)
|
|
processing_started = models.DateTimeField(**BNULL)
|
|
processed_finished = models.DateTimeField(**BNULL)
|
|
process_log = models.TextField(**BNULL)
|
|
process_count = models.IntegerField(**BNULL)
|
|
csv_file = models.FileField(upload_to="birding-csv-uploads/", **BNULL)
|
|
|
|
class Meta:
|
|
verbose_name = "Birding CSV Import"
|
|
|
|
def __str__(self):
|
|
return f"Birding import on {self.human_start}"
|
|
|
|
@property
|
|
def human_start(self):
|
|
start = "Unknown"
|
|
if self.processing_started:
|
|
start = self.processing_started.strftime("%B %d, %Y at %H:%M")
|
|
return start
|
|
|
|
@property
|
|
def import_type(self):
|
|
return "Birding CSV"
|
|
|
|
def get_absolute_url(self):
|
|
return reverse("birds:csv_import_detail", kwargs={"slug": self.uuid})
|
|
|
|
@property
|
|
def upload_file_path(self):
|
|
if getattr(settings, "USE_S3_STORAGE"):
|
|
path = self.csv_file.url
|
|
else:
|
|
path = self.csv_file.path
|
|
return path
|
|
|
|
def mark_started(self):
|
|
self.processing_started = timezone.now()
|
|
self.save(update_fields=["processing_started"])
|
|
|
|
def mark_finished(self):
|
|
self.processed_finished = timezone.now()
|
|
self.save(update_fields=["processed_finished"])
|
|
|
|
def record_log(self, scrobbles):
|
|
self.process_log = ""
|
|
if not scrobbles:
|
|
self.process_count = 0
|
|
self.save(update_fields=["process_log", "process_count"])
|
|
return
|
|
for count, scrobble in enumerate(scrobbles):
|
|
scrobble_str = (
|
|
f"{scrobble.id}\t{scrobble.timestamp}\t{scrobble.media_obj}"
|
|
)
|
|
log_line = f"{scrobble_str}"
|
|
if count > 0:
|
|
log_line = "\n" + log_line
|
|
self.process_log += log_line
|
|
self.process_count = len(scrobbles)
|
|
self.save(update_fields=["process_log", "process_count"])
|
|
|
|
def scrobbles(self):
|
|
from scrobbles.models import Scrobble
|
|
|
|
scrobble_ids = []
|
|
if self.process_log:
|
|
for line in self.process_log.split("\n"):
|
|
sid = line.split("\t")[0]
|
|
if sid:
|
|
scrobble_ids.append(sid)
|
|
return Scrobble.objects.filter(id__in=scrobble_ids)
|
|
|
|
def process(self, force=False):
|
|
if self.processed_finished and not force:
|
|
logger.info(f"{self} already processed on {self.processed_finished}")
|
|
return
|
|
from birds.importer import import_birding_csv
|
|
|
|
self.mark_started()
|
|
scrobbles = import_birding_csv(self.upload_file_path, self.user_id)
|
|
self.record_log(scrobbles)
|
|
self.mark_finished()
|