Fix KOReader imports to use pages for scrobbles
This commit is contained in:
@ -1,12 +1,17 @@
|
||||
import codecs
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import os
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Iterable, List
|
||||
|
||||
import pytz
|
||||
|
||||
import requests
|
||||
from books.models import Author, Book, Page
|
||||
from pylast import httpx, tempfile
|
||||
from scrobbles.models import Scrobble
|
||||
from stream_sqlite import stream_sqlite
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -34,54 +39,24 @@ class KoReaderPageStatColumn(Enum):
|
||||
TOTAL_PAGES = 4
|
||||
|
||||
|
||||
def process_pages_for_book(book_id, sqlite_file_path):
|
||||
con = sqlite3.connect(sqlite_file_path)
|
||||
cur = con.cursor()
|
||||
|
||||
book = Book.objects.filter(koreader_id=book_id).first()
|
||||
if not book:
|
||||
logger.error(f"No book found with KoReader ID of {book_id}")
|
||||
return
|
||||
|
||||
page_table = cur.execute(
|
||||
f"SELECT * FROM page_stat_data where id_book={book_id}"
|
||||
)
|
||||
new_pages = []
|
||||
for page_row in page_table:
|
||||
page_number = page_row[KoReaderPageStatColumn.PAGE.value]
|
||||
page, page_created = Page.objects.get_or_create(
|
||||
book=book, number=page_number
|
||||
)
|
||||
if page_created:
|
||||
ts = page_row[KoReaderPageStatColumn.START_TIME.value]
|
||||
page.start_time = datetime.utcfromtimestamp(ts).replace(
|
||||
tzinfo=pytz.utc
|
||||
)
|
||||
page.duration_seconds = page_row[
|
||||
KoReaderPageStatColumn.DURATION.value
|
||||
]
|
||||
page.save()
|
||||
new_pages.append(page)
|
||||
logger.info(f"Added {len(new_pages)} for book {book}")
|
||||
return new_pages
|
||||
def _sqlite_bytes(sqlite_url):
|
||||
with httpx.stream("GET", sqlite_url) as r:
|
||||
yield from r.iter_bytes(chunk_size=65_536)
|
||||
|
||||
|
||||
def process_koreader_sqlite_file(sqlite_file_path, user_id):
|
||||
"""Given a sqlite file from KoReader, open the book table, iterate
|
||||
over rows creating scrobbles from each book found"""
|
||||
# Create a SQL connection to our SQLite database
|
||||
con = sqlite3.connect(sqlite_file_path)
|
||||
cur = con.cursor()
|
||||
def get_book_map_from_sqlite(rows: Iterable) -> dict:
|
||||
"""Given an interable of sqlite rows from the books table, lookup existing
|
||||
books, create ones that don't exist, and return a mapping of koreader IDs to
|
||||
primary key IDs for page creation.
|
||||
|
||||
# Return all results of query
|
||||
book_table = cur.execute("SELECT * FROM book")
|
||||
new_scrobbles = []
|
||||
for book_row in book_table:
|
||||
"""
|
||||
book_id_map = {}
|
||||
|
||||
for book_row in rows:
|
||||
authors = book_row[KoReaderBookColumn.AUTHORS.value].split("\n")
|
||||
author_list = []
|
||||
for author_str in authors:
|
||||
logger.debug(f"Looking up author {author_str}")
|
||||
|
||||
if author_str == "N/A":
|
||||
continue
|
||||
|
||||
@ -96,11 +71,11 @@ def process_koreader_sqlite_file(sqlite_file_path, user_id):
|
||||
)
|
||||
|
||||
if created:
|
||||
pages = book_row[KoReaderBookColumn.PAGES.value]
|
||||
run_time = pages * book.AVG_PAGE_READING_SECONDS
|
||||
total_pages = book_row[KoReaderBookColumn.PAGES.value]
|
||||
run_time = total_pages * book.AVG_PAGE_READING_SECONDS
|
||||
book_dict = {
|
||||
"title": book_row[KoReaderBookColumn.TITLE.value],
|
||||
"pages": book_row[KoReaderBookColumn.PAGES.value],
|
||||
"pages": total_pages,
|
||||
"koreader_md5": book_row[KoReaderBookColumn.MD5.value],
|
||||
"koreader_id": int(book_row[KoReaderBookColumn.ID.value]),
|
||||
"koreader_authors": book_row[KoReaderBookColumn.AUTHORS.value],
|
||||
@ -117,6 +92,7 @@ def process_koreader_sqlite_file(sqlite_file_path, user_id):
|
||||
playback_position_seconds = book_row[
|
||||
KoReaderBookColumn.TOTAL_READ_TIME.value
|
||||
]
|
||||
|
||||
pages_read = 0
|
||||
if book_row[KoReaderBookColumn.TOTAL_READ_PAGES.value]:
|
||||
pages_read = int(
|
||||
@ -125,37 +101,123 @@ def process_koreader_sqlite_file(sqlite_file_path, user_id):
|
||||
timestamp = datetime.utcfromtimestamp(
|
||||
book_row[KoReaderBookColumn.LAST_OPEN.value]
|
||||
).replace(tzinfo=pytz.utc)
|
||||
process_pages_for_book(book.koreader_id, sqlite_file_path)
|
||||
book_id_map[book.koreader_id] = book.id
|
||||
|
||||
new_scrobble = Scrobble(
|
||||
book_id=book.id,
|
||||
user_id=user_id,
|
||||
source="KOReader",
|
||||
timestamp=timestamp,
|
||||
playback_position_seconds=playback_position_seconds,
|
||||
played_to_completion=True,
|
||||
in_progress=False,
|
||||
book_pages_read=pages_read,
|
||||
return book_id_map
|
||||
|
||||
|
||||
def build_scrobbles_from_pages(
|
||||
rows: Iterable, book_id_map: dict, user_id: int
|
||||
) -> List[Scrobble]:
|
||||
new_scrobbles = []
|
||||
|
||||
new_scrobbles = []
|
||||
for page_row in rows:
|
||||
koreader_id = page_row[KoReaderPageStatColumn.ID_BOOK.value]
|
||||
page_number = page_row[KoReaderPageStatColumn.PAGE.value]
|
||||
ts = page_row[KoReaderPageStatColumn.START_TIME.value]
|
||||
book_id = book_id_map[koreader_id]
|
||||
|
||||
page, page_created = Page.objects.get_or_create(
|
||||
book_id=book_id, number=page_number, user_id=user_id
|
||||
)
|
||||
if page_created:
|
||||
page.start_time = datetime.utcfromtimestamp(ts).replace(
|
||||
tzinfo=pytz.utc
|
||||
)
|
||||
page.duration_seconds = page_row[
|
||||
KoReaderPageStatColumn.DURATION.value
|
||||
]
|
||||
page.save(update_fields=["start_time", "duration_seconds"])
|
||||
page.refresh_from_db()
|
||||
if page.is_scrobblable:
|
||||
# Page number is a placeholder, we'll re-preocess this after creation
|
||||
logger.debug(
|
||||
f"Queueing scrobble for {page.book}, page {page.number}"
|
||||
)
|
||||
new_scrobble = Scrobble(
|
||||
book_id=page.book_id,
|
||||
user_id=user_id,
|
||||
source="KOReader",
|
||||
timestamp=page.start_time,
|
||||
played_to_completion=True,
|
||||
in_progress=False,
|
||||
book_pages_read=page_number,
|
||||
long_play_complete=False,
|
||||
)
|
||||
new_scrobbles.append(new_scrobble)
|
||||
return new_scrobbles
|
||||
|
||||
existing = Scrobble.objects.filter(
|
||||
timestamp=timestamp, book=book
|
||||
).first()
|
||||
if existing:
|
||||
logger.debug(f"Skipping existing scrobble {new_scrobble}")
|
||||
continue
|
||||
if book.progress_for_user(user_id) >= Book.COMPLETION_PERCENT:
|
||||
new_scrobble.long_play_complete = True
|
||||
|
||||
logger.debug(f"Queued scrobble {new_scrobble} for creation")
|
||||
new_scrobbles.append(new_scrobble)
|
||||
def enrich_koreader_scrobbles(scrobbles: list) -> None:
|
||||
"""Given a list of scrobbles, update pages read, long play seconds and check
|
||||
for media completion"""
|
||||
|
||||
# Be sure to close the connection
|
||||
con.close()
|
||||
for scrobble in scrobbles:
|
||||
if scrobble.next:
|
||||
scrobble.book_pages_read = scrobble.next.book_pages_read - 1
|
||||
scrobble.save(update_fields=["book_pages_read"])
|
||||
else:
|
||||
scrobble.book_pages_read = scrobble.book.page_set.last().number
|
||||
scrobble.long_play_complete =
|
||||
|
||||
created = Scrobble.objects.bulk_create(new_scrobbles)
|
||||
logger.info(
|
||||
f"Created {len(created)} scrobbles",
|
||||
extra={"created_scrobbles": created},
|
||||
)
|
||||
scrobble.save(update_fields=["book_pages_read", "long_play_complete"])
|
||||
|
||||
|
||||
def process_koreader_sqlite_url(file_url, user_id) -> list:
|
||||
book_id_map = {}
|
||||
new_scrobbles = []
|
||||
|
||||
for table_name, pragma_table_info, rows in stream_sqlite(
|
||||
_sqlite_bytes(file_url), max_buffer_size=1_048_576
|
||||
):
|
||||
if table_name == "book":
|
||||
book_id_map = get_book_map_from_sqlite(rows)
|
||||
|
||||
if table_name == "page":
|
||||
new_scrobbles = build_scrobbles_from_pages(
|
||||
rows, book_id_map, user_id
|
||||
)
|
||||
|
||||
created = []
|
||||
if new_scrobbles:
|
||||
created = Scrobble.objects.bulk_create(new_scrobbles)
|
||||
enrich_koreader_scrobbles(created)
|
||||
logger.info(
|
||||
f"Created {len(created)} scrobbles",
|
||||
extra={"created_scrobbles": created},
|
||||
)
|
||||
return created
|
||||
|
||||
|
||||
def process_koreader_sqlite_file(file_path, user_id) -> list:
|
||||
"""Given a sqlite file from KoReader, open the book table, iterate
|
||||
over rows creating scrobbles from each book found"""
|
||||
# Create a SQL connection to our SQLite database
|
||||
con = sqlite3.connect(file_path)
|
||||
cur = con.cursor()
|
||||
|
||||
book_id_map = get_book_map_from_sqlite(cur.execute("SELECT * FROM book"))
|
||||
new_scrobbles = build_scrobbles_from_pages(
|
||||
cur.execute("SELECT * from page_stat_data"), book_id_map, user_id
|
||||
)
|
||||
|
||||
created = []
|
||||
if new_scrobbles:
|
||||
created = Scrobble.objects.bulk_create(new_scrobbles)
|
||||
enrich_koreader_scrobbles(created)
|
||||
logger.info(
|
||||
f"Created {len(created)} scrobbles",
|
||||
extra={"created_scrobbles": created},
|
||||
)
|
||||
return created
|
||||
|
||||
|
||||
def process_koreader_sqlite(file_path: str, user_id: int) -> list:
|
||||
is_os_file = "https://" not in file_path
|
||||
|
||||
if is_os_file:
|
||||
created = process_koreader_sqlite_file(file_path, user_id)
|
||||
else:
|
||||
created = process_koreader_sqlite_url(file_path, user_id)
|
||||
return created
|
||||
|
||||
@ -126,6 +126,9 @@ class BaseFileImportMixin(TimeStampedModel):
|
||||
self.process_count = len(scrobbles)
|
||||
self.save(update_fields=["process_log", "process_count"])
|
||||
|
||||
def upload_file_path(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class KoReaderImport(BaseFileImportMixin):
|
||||
class Meta:
|
||||
@ -144,10 +147,18 @@ class KoReaderImport(BaseFileImportMixin):
|
||||
uuid = instance.uuid
|
||||
return f"koreader-uploads/{uuid}.{extension}"
|
||||
|
||||
@property
|
||||
def upload_file_path(self) -> str:
|
||||
if getattr(settings, "USE_S3_STORAGE"):
|
||||
path = self.sqlite_file.url
|
||||
else:
|
||||
path = self.sqlite_file.path
|
||||
return path
|
||||
|
||||
sqlite_file = models.FileField(upload_to=get_path, **BNULL)
|
||||
|
||||
def process(self, force=False):
|
||||
from books.koreader import process_koreader_sqlite_file
|
||||
from books.koreader import process_koreader_sqlite
|
||||
|
||||
if self.processed_finished and not force:
|
||||
logger.info(
|
||||
@ -156,8 +167,8 @@ class KoReaderImport(BaseFileImportMixin):
|
||||
return
|
||||
|
||||
self.mark_started()
|
||||
scrobbles = process_koreader_sqlite_file(
|
||||
self.sqlite_file.path, self.user.id
|
||||
scrobbles = process_koreader_sqlite(
|
||||
self.upload_file_path, self.user.id
|
||||
)
|
||||
self.record_log(scrobbles)
|
||||
self.mark_finished()
|
||||
@ -180,6 +191,13 @@ class AudioScrobblerTSVImport(BaseFileImportMixin):
|
||||
uuid = instance.uuid
|
||||
return f"audioscrobbler-uploads/{uuid}.{extension}"
|
||||
|
||||
def upload_file_path(self):
|
||||
if getattr(settings, "USE_S3_STORAGE"):
|
||||
path = self.tsv_file.url
|
||||
else:
|
||||
path = self.tsv_file.path
|
||||
return path
|
||||
|
||||
tsv_file = models.FileField(upload_to=get_path, **BNULL)
|
||||
|
||||
def process(self, force=False):
|
||||
@ -198,12 +216,8 @@ class AudioScrobblerTSVImport(BaseFileImportMixin):
|
||||
if self.user:
|
||||
user_id = self.user.id
|
||||
tz = self.user.profile.tzinfo
|
||||
if getattr(settings, "USE_S3_STORAGE"):
|
||||
tsv_str = self.tsv_file.url
|
||||
else:
|
||||
tsv_str = self.tsv_file.path
|
||||
scrobbles = process_audioscrobbler_tsv_file(
|
||||
tsv_str, user_id, user_tz=tz
|
||||
self.upload_file_path, user_id, user_tz=tz
|
||||
)
|
||||
self.record_log(scrobbles)
|
||||
self.mark_finished()
|
||||
@ -457,13 +471,21 @@ class Scrobble(TimeStampedModel):
|
||||
return is_stale
|
||||
|
||||
@property
|
||||
def previous(self):
|
||||
def previous(self) -> "Scrobble":
|
||||
return (
|
||||
self.media_obj.scrobble_set.order_by("-timestamp")
|
||||
.filter(timestamp__lt=self.timestamp)
|
||||
.first()
|
||||
)
|
||||
|
||||
@property
|
||||
def next(self) -> "Scrobble":
|
||||
return (
|
||||
self.media_obj.scrobble_set.order_by("timestamp")
|
||||
.filter(timestamp__gt=self.timestamp)
|
||||
.first()
|
||||
)
|
||||
|
||||
@property
|
||||
def session_pages_read(self) -> Optional[int]:
|
||||
"""Look one scrobble back, if it isn't complete,"""
|
||||
|
||||
Reference in New Issue
Block a user