Files
vrobbler/vrobbler/apps/scrobbles/importers/trail_gpx.py
Colin Powell c8292d1c06
Some checks failed
build / test (push) Has been cancelled
[scrobbles] Back fill visibility field
2026-06-09 13:15:57 -04:00

336 lines
11 KiB
Python

import json
import logging
import os
import tempfile
from datetime import timezone as dt_timezone
from math import asin, cos, radians, sin, sqrt
from typing import Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.files import File
from locations.models import GeoLocation
from scrobbles.models import Scrobble
from scrobbles.notifications import ScrobbleNtfyNotification
from trails.models import Trail, TrailLogData
logger = logging.getLogger(__name__)
User = get_user_model()
def haversine(lat1, lon1, lat2, lon2):
R = 6371000
dlat = radians(lat2 - lat1)
dlon = radians(lon2 - lon1)
a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
return R * 2 * asin(sqrt(a))
def find_route_waypoint(points):
if not points:
return None
total = 0.0
for i in range(1, len(points)):
lat1, lon1 = points[i - 1][0], points[i - 1][1]
lat2, lon2 = points[i][0], points[i][1]
if lat1 is not None and lon1 is not None and lat2 is not None and lon2 is not None:
total += haversine(lat1, lon1, lat2, lon2)
halfway = total / 2.0
cumulative = 0.0
for i in range(1, len(points)):
lat1, lon1 = points[i - 1][0], points[i - 1][1]
lat2, lon2 = points[i][0], points[i][1]
if lat1 is not None and lon1 is not None and lat2 is not None and lon2 is not None:
cumulative += haversine(lat1, lon1, lat2, lon2)
if cumulative >= halfway:
return (lat2, lon2)
return (points[-1][0], points[-1][1])
def compute_trail_stats(points):
distance_m = 0.0
elevation_gain = 0.0
first_time = None
last_time = None
for i in range(1, len(points)):
lat1, lon1, ele1, t1 = points[i - 1]
lat2, lon2, ele2, t2 = points[i]
if lat1 is not None and lon1 is not None and lat2 is not None and lon2 is not None:
distance_m += haversine(lat1, lon1, lat2, lon2)
if ele1 is not None and ele2 is not None and ele2 > ele1:
elevation_gain += ele2 - ele1
if first_time is None and t1 is not None:
first_time = t1
if t2 is not None:
last_time = t2
moving_time = (
(last_time - first_time).total_seconds()
if first_time and last_time
else 0
)
avg_speed_kmh = (distance_m / 1000) / (moving_time / 3600) if moving_time > 0 else 0.0
return {
"distance_km": round(distance_m / 1000, 2),
"elevation_gain_m": round(elevation_gain, 1),
"moving_time_seconds": int(moving_time),
"avg_speed_kmh": round(avg_speed_kmh, 2),
}
def parse_trackpoints(file_path):
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext == ".gpx":
return _parse_gpx(file_path)
elif ext == ".fit":
return _parse_fit(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
def _parse_gpx(file_path):
import gpxpy
with open(file_path) as f:
gpx = gpxpy.parse(f)
points = []
track_name = None
description = None
extra = {}
for track in gpx.tracks:
if track.name and not track_name:
track_name = track.name
if track.description and not description:
description = track.description
if track.comment and not extra:
try:
cmt_data = json.loads(track.comment)
data = cmt_data.get("activity_dict") or cmt_data
hr_avg = data.get("average_heartrate")
if hr_avg is not None:
extra["avg_heartrate"] = int(round(hr_avg))
hr_max = data.get("max_heartrate")
if hr_max is not None:
extra["max_heartrate"] = int(round(hr_max))
extra["calories"] = data.get("calories")
extra["moving_time_seconds"] = data.get("moving_time")
extra["total_elevation_gain_m"] = data.get("total_elevation_gain")
avg_speed = data.get("average_speed")
if avg_speed is not None:
extra["avg_speed_kmh"] = round(avg_speed * 3.6, 2)
extra["activity_type"] = cmt_data.get("type") or data.get("type")
except (json.JSONDecodeError, TypeError):
pass
for seg in track.segments:
for pt in seg.points:
points.append((pt.latitude, pt.longitude, pt.elevation, pt.time))
return {
"points": points,
"name": track_name or os.path.splitext(os.path.basename(file_path))[0],
"description": description,
"extra": extra,
}
def _parse_fit(file_path):
import fitparse
fitfile = fitparse.FitFile(file_path)
messages = list(fitfile.get_messages("record"))
points = []
track_name = os.path.splitext(os.path.basename(file_path))[0]
extra = {}
session_msgs = list(fitfile.get_messages("session"))
for msg in session_msgs:
for field in msg:
if field.name == "sport" and field.value:
track_name = str(field.value)
elif field.name == "total_distance" and field.value:
extra["distance_km"] = round(field.value / 1000, 2)
elif field.name == "total_timer_time" and field.value:
extra["moving_time_seconds"] = int(field.value)
elif field.name == "total_elapsed_time" and field.value:
extra.setdefault("moving_time_seconds", int(field.value))
elif field.name == "total_ascent" and field.value:
extra["total_elevation_gain_m"] = field.value
elif field.name == "avg_heart_rate" and field.value:
extra["avg_heartrate"] = int(round(field.value))
elif field.name == "max_heart_rate" and field.value:
extra["max_heartrate"] = int(round(field.value))
elif field.name == "avg_speed" and field.value:
extra["avg_speed_kmh"] = round(field.value * 3.6, 2)
elif field.name == "total_calories" and field.value:
extra["calories"] = int(field.value)
for msg in messages:
lat = None
lon = None
ele = None
t = None
for field in msg:
if field.name == "position_lat":
lat = field.value * (180.0 / (2**31)) if field.value else None
elif field.name == "position_long":
lon = field.value * (180.0 / (2**31)) if field.value else None
elif field.name == "altitude":
ele = field.value
elif field.name == "timestamp":
t = field.value.replace(tzinfo=dt_timezone.utc) if field.value else None
if lat is not None and lon is not None:
points.append((lat, lon, ele, t))
return {
"points": points,
"name": track_name,
"description": None,
"extra": extra,
}
def convert_fit_to_gpx(file_path):
import gpxpy.gpx
result = _parse_fit(file_path)
points = result["points"]
track_name = result["name"]
gpx = gpxpy.gpx.GPX()
gpx_track = gpxpy.gpx.GPXTrack(name=track_name)
gpx.tracks.append(gpx_track)
gpx_seg = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_seg)
for lat, lon, ele, t in points:
gpx_seg.points.append(gpxpy.gpx.GPXTrackPoint(lat, lon, elevation=ele, time=t))
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".gpx", delete=False)
tmp.write(gpx.to_xml())
tmp.close()
return tmp.name
def import_trail_gpx(file_path, user_id, original_filename=None):
user = User.objects.get(id=user_id)
new_scrobbles = []
result = parse_trackpoints(file_path)
points = result["points"]
track_name = result["name"]
description = result.get("description")
extra = result.get("extra", {})
if not points:
logger.warning(f"No trackpoints found in {file_path}")
return []
first_lat, first_lon, _, first_time = points[0]
_, _, _, last_time = points[-1]
if first_time is None:
logger.warning(f"No timestamps in {file_path}")
return []
route_pt = find_route_waypoint(points)
geo, _ = GeoLocation.objects.get_or_create(
lat=round(first_lat, 6),
lon=round(first_lon, 6),
defaults={"altitude": None},
)
trail = Trail.find_by_trailhead(
first_lat, first_lon,
route_lat=route_pt[0] if route_pt else None,
route_lon=route_pt[1] if route_pt else None,
tolerance_m=100,
)
if not trail:
trail = Trail.objects.create(
title=track_name,
trailhead_location=geo,
route_lat=route_pt[0] if route_pt else None,
route_lon=route_pt[1] if route_pt else None,
)
timestamp = first_time
stop_timestamp = last_time
existing = Scrobble.objects.filter(
timestamp=timestamp,
trail=trail,
user=user,
).first()
if existing:
logger.debug(f"Skipping existing scrobble for trail {trail}")
return []
computed = compute_trail_stats(points)
logdata = TrailLogData(
description=description,
distance_km=computed["distance_km"],
elevation_gain_m=computed["elevation_gain_m"],
moving_time_seconds=computed["moving_time_seconds"],
activity_type=extra.get("activity_type"),
avg_speed_kmh=computed["avg_speed_kmh"],
avg_heartrate=extra.get("avg_heartrate"),
max_heartrate=extra.get("max_heartrate"),
calories=extra.get("calories"),
)
duration = int((stop_timestamp - timestamp).total_seconds()) if stop_timestamp else 0
scrobble = Scrobble(
user=user,
timestamp=timestamp,
stop_timestamp=stop_timestamp,
playback_position_seconds=duration,
source="GPX Import",
trail=trail,
log=logdata.asdict,
timezone=user.profile.timezone or settings.TIME_ZONE,
played_to_completion=True,
in_progress=False,
media_type=Scrobble.MediaType.TRAIL,
visibility="private",
)
_, ext = os.path.splitext(file_path)
if ext.lower() == ".fit":
gpx_path = convert_fit_to_gpx(file_path)
with open(gpx_path, "rb") as f:
scrobble.gpx_file.save(
f"{original_filename or 'trail'}.gpx",
File(f),
save=False,
)
os.unlink(gpx_path)
else:
with open(file_path, "rb") as f:
scrobble.gpx_file.save(
original_filename or os.path.basename(file_path),
File(f),
save=False,
)
new_scrobbles.append(scrobble)
created = Scrobble.objects.bulk_create(new_scrobbles)
logger.info(f"Created {len(created)} trail scrobbles")
for scrobble in created:
ScrobbleNtfyNotification(scrobble).send()
return created