187 lines
5.8 KiB
Python
187 lines
5.8 KiB
Python
import csv
|
|
import logging
|
|
import re
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.contrib.auth import get_user_model
|
|
|
|
from birds.models import Bird, BirdSightingEntry, BirdSightingLogData, BirdingLocation
|
|
from scrobbles.models import Scrobble
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
|
|
LOCATION_COORDS_RE = re.compile(r"\(([\d\.\-]+),\s*([\d\.\-]+)\)")
|
|
DURATION_RE = re.compile(r"(\d+)\s*minute")
|
|
|
|
|
|
def parse_duration(duration_str):
|
|
if not duration_str:
|
|
return None
|
|
match = DURATION_RE.search(duration_str)
|
|
if match:
|
|
return int(match.group(1))
|
|
return None
|
|
|
|
|
|
def parse_coords(location_str):
|
|
match = LOCATION_COORDS_RE.search(location_str)
|
|
if match:
|
|
return float(match.group(1)), float(match.group(2))
|
|
return None, None
|
|
|
|
|
|
def parse_timestamp(date_str, time_str):
|
|
try:
|
|
dt = datetime.strptime(f"{date_str} {time_str}", "%B %d, %Y %I:%M %p")
|
|
return dt
|
|
except (ValueError, TypeError):
|
|
try:
|
|
dt = datetime.strptime(date_str, "%B %d, %Y")
|
|
return dt
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"Could not parse date/time: {date_str} {time_str}")
|
|
return None
|
|
|
|
|
|
def parse_bool(value):
|
|
if not value:
|
|
return None
|
|
return value.strip().lower() in ("true", "yes", "1")
|
|
|
|
|
|
def parse_int(value):
|
|
if not value:
|
|
return None
|
|
try:
|
|
return int(value.strip())
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def import_birding_csv(file_path, user_id):
|
|
user = User.objects.get(id=user_id)
|
|
new_scrobbles = []
|
|
|
|
with open(file_path, newline="", encoding="utf-8-sig") as f:
|
|
reader = csv.DictReader(f)
|
|
rows = list(reader)
|
|
|
|
groups = defaultdict(list)
|
|
for row in rows:
|
|
key = (
|
|
row.get("Location", "").strip(),
|
|
row.get("Observation Date", "").strip(),
|
|
row.get("Start Time", "").strip(),
|
|
)
|
|
groups[key].append(row)
|
|
|
|
for (location_str, date_str, time_str), sighting_rows in groups.items():
|
|
if not location_str:
|
|
logger.warning("Skipping rows with no location")
|
|
continue
|
|
|
|
timestamp = parse_timestamp(date_str, time_str)
|
|
if not timestamp:
|
|
continue
|
|
|
|
timestamp = user.profile.get_timestamp_with_tz(timestamp)
|
|
|
|
location_title = (
|
|
LOCATION_COORDS_RE.sub("", location_str).strip().rstrip(",").strip()
|
|
)
|
|
if not location_title:
|
|
location_title = location_str
|
|
|
|
location = BirdingLocation.find_or_create(location_title)
|
|
lat, lon = parse_coords(location_str)
|
|
if lat and lon and not location.geo_location:
|
|
from locations.models import GeoLocation
|
|
|
|
geo, _ = GeoLocation.objects.get_or_create(
|
|
lat=round(lat, 6),
|
|
lon=round(lon, 6),
|
|
defaults={"altitude": None},
|
|
)
|
|
location.geo_location = geo
|
|
location.save(update_fields=["geo_location"])
|
|
|
|
first_row = sighting_rows[0]
|
|
|
|
birds_data = []
|
|
for row in sighting_rows:
|
|
species = row.get("Species", "").strip()
|
|
if not species:
|
|
continue
|
|
count = parse_int(row.get("Count")) or 1
|
|
details = row.get("Details", "").strip()
|
|
|
|
bird = Bird.find_or_create(species)
|
|
entry = BirdSightingEntry(
|
|
bird_id=bird.id, quantity=count, sighting_notes=details or None
|
|
)
|
|
birds_data.append(entry.asdict)
|
|
|
|
duration_minutes = parse_duration(first_row.get("Duration", ""))
|
|
logdata = BirdSightingLogData(
|
|
birds=birds_data,
|
|
duration_minutes=duration_minutes,
|
|
observation_type=first_row.get("Observation Type", "").strip() or None,
|
|
distance=first_row.get("Distance", "").strip() or None,
|
|
area=first_row.get("Area", "").strip() or None,
|
|
party_size=parse_int(first_row.get("Party Size")),
|
|
complete_checklist=parse_bool(first_row.get("Complete Checklist")),
|
|
)
|
|
|
|
log_dict = logdata.asdict
|
|
|
|
weather_loc = location.geo_location
|
|
if not weather_loc:
|
|
last_loc = (
|
|
Scrobble.objects.filter(
|
|
user=user,
|
|
media_type=Scrobble.MediaType.GEO_LOCATION,
|
|
geo_location__isnull=False,
|
|
)
|
|
.order_by("-timestamp")
|
|
.first()
|
|
)
|
|
if last_loc:
|
|
weather_loc = last_loc.geo_location
|
|
if weather_loc:
|
|
weather = weather_loc.current_weather
|
|
if weather:
|
|
log_dict["weather"] = weather["description"]
|
|
log_dict["temperature"] = weather["temp"]
|
|
|
|
stop_timestamp = timestamp + timedelta(minutes=duration_minutes) if duration_minutes else None
|
|
|
|
tz = getattr(timestamp.tzinfo, "name", None)
|
|
|
|
scrobble = Scrobble(
|
|
user=user,
|
|
timestamp=timestamp,
|
|
timezone=tz,
|
|
stop_timestamp=stop_timestamp,
|
|
source="Birding CSV Import",
|
|
birding_location=location,
|
|
log=log_dict,
|
|
played_to_completion=True,
|
|
in_progress=False,
|
|
media_type=Scrobble.MediaType.BIRDING_LOCATION,
|
|
)
|
|
existing = Scrobble.objects.filter(
|
|
timestamp=timestamp,
|
|
birding_location=location,
|
|
user=user,
|
|
).first()
|
|
if existing:
|
|
logger.debug(f"Skipping existing scrobble for {location}")
|
|
continue
|
|
new_scrobbles.append(scrobble)
|
|
|
|
created = Scrobble.objects.bulk_create(new_scrobbles)
|
|
logger.info(f"Created {len(created)} birding scrobbles")
|
|
return created
|