1055 lines
40 KiB
Python
1055 lines
40 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import html
|
||
import json
|
||
import mimetypes
|
||
import os
|
||
import re
|
||
import socketserver
|
||
import threading
|
||
import webbrowser
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||
from pathlib import Path
|
||
from typing import Iterable
|
||
from urllib.parse import parse_qs, urlencode, urlparse
|
||
from urllib.request import Request, urlopen
|
||
|
||
|
||
@dataclass
|
||
class OrgFile:
|
||
path: Path
|
||
relative_path: str
|
||
title: str
|
||
file_id: str | None
|
||
content: str
|
||
mtime: float = 0.0
|
||
pinned: bool = False
|
||
|
||
|
||
ROOT_DIR = Path(__file__).resolve().parent
|
||
TEMPLATE_PATH = ROOT_DIR / "templates" / "index.html"
|
||
STATIC_DIR = ROOT_DIR / "static"
|
||
TITLE_RE = re.compile(r"^\s*#\+TITLE:\s*(.+?)\s*$", flags=re.IGNORECASE)
|
||
ID_RE = re.compile(r"^\s*:ID:\s*(\S+)\s*$", flags=re.IGNORECASE)
|
||
ORG_LINK_RE = re.compile(r"\[\[([^\]]+)\](?:\[([^\]]*)\])?\]")
|
||
URL_RE = re.compile(r"https?://[^\s<>'\"()\[\]]+")
|
||
CREATED_LINE_RE = re.compile(r"^\s*(?:#\+)?CREATED:\s*(.+?)\s*$", flags=re.IGNORECASE)
|
||
ORG_TIMESTAMP_RE = re.compile(r"[\[<](\d{4})-(\d{2})-(\d{2})(?:\s+\w{3})?(?:\s+(\d{2}):(\d{2}))?[\]>]")
|
||
PROPERTY_LINE_RE = re.compile(r"^\s*:([A-Za-z_][A-Za-z0-9_-]*):\s+(.+?)\s*$")
|
||
INLINE_TIMESTAMP_RE = re.compile(
|
||
r"(?P<bracket>[<\[])"
|
||
r"(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})"
|
||
r"(?:\s+(?P<dayname>\w{2,3}))?"
|
||
r"(?:\s+(?P<hour>\d{2}):(?P<minute>\d{2})(?:-(?P<end_hour>\d{2}):(?P<end_minute>\d{2}))?)?"
|
||
r"[>\]]"
|
||
)
|
||
TODO_KEYWORDS = {"TODO", "STRT", "DONE", "WAIT"}
|
||
TODO_KEYWORD_RE = re.compile(r"^(" + "|".join(TODO_KEYWORDS) + r")\b\s*(.*)", re.DOTALL)
|
||
DEFAULT_CONFIG_PATH = ROOT_DIR / "config.yaml"
|
||
|
||
|
||
def scan_org_files(base_dir: Path) -> list[OrgFile]:
|
||
"""Recursively find .org files under base_dir and return their contents."""
|
||
org_files: list[OrgFile] = []
|
||
|
||
for root, dirs, files in os.walk(base_dir):
|
||
dirs[:] = [d for d in dirs if d not in {".git", ".venv", "venv", "__pycache__"}]
|
||
root_path = Path(root)
|
||
for filename in files:
|
||
if filename.lower().endswith(".org"):
|
||
file_path = root_path / filename
|
||
try:
|
||
content = file_path.read_text(encoding="utf-8")
|
||
except UnicodeDecodeError:
|
||
content = file_path.read_text(encoding="utf-8", errors="replace")
|
||
|
||
org_files.append(
|
||
OrgFile(
|
||
path=file_path,
|
||
relative_path=normalize_relative_path(str(file_path.relative_to(base_dir))),
|
||
title=extract_org_title(content, file_path.stem),
|
||
file_id=extract_org_id(content),
|
||
content=content,
|
||
mtime=file_path.stat().st_mtime,
|
||
)
|
||
)
|
||
|
||
org_files.sort(key=lambda f: f.mtime, reverse=True)
|
||
|
||
if os.environ.get("ORGWEB_PIN_TODAY_FILE", "").lower() in ("1", "true", "yes"):
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
pinned = []
|
||
rest = []
|
||
for f in org_files:
|
||
if today in f.relative_path or today in f.title:
|
||
f.pinned = True
|
||
pinned.append(f)
|
||
else:
|
||
rest.append(f)
|
||
org_files = pinned + rest
|
||
|
||
return org_files
|
||
|
||
|
||
def extract_org_title(content: str, fallback: str) -> str:
|
||
for line in content.splitlines():
|
||
match = TITLE_RE.match(line)
|
||
if match:
|
||
return match.group(1).strip()
|
||
return fallback
|
||
|
||
|
||
def extract_org_id(content: str) -> str | None:
|
||
for line in content.splitlines():
|
||
match = ID_RE.match(line)
|
||
if match:
|
||
return match.group(1).strip()
|
||
return None
|
||
|
||
|
||
def normalize_relative_path(path_value: str) -> str:
|
||
normalized = os.path.normpath(path_value.replace("\\", "/"))
|
||
if normalized == ".":
|
||
return ""
|
||
return normalized.lstrip("./")
|
||
|
||
|
||
def extract_org_link_targets(content: str) -> list[str]:
|
||
targets: list[str] = []
|
||
for match in ORG_LINK_RE.finditer(content):
|
||
targets.append(match.group(1).strip())
|
||
return targets
|
||
|
||
|
||
def resolve_link_target(
|
||
source_relative_path: str,
|
||
raw_target: str,
|
||
known_paths: set[str],
|
||
id_to_path: dict[str, str],
|
||
) -> str | None:
|
||
target = raw_target.strip()
|
||
if not target:
|
||
return None
|
||
if "::" in target:
|
||
target = target.split("::", 1)[0]
|
||
if "#" in target:
|
||
target = target.split("#", 1)[0]
|
||
if target.lower().startswith("id:"):
|
||
id_value = target[3:].strip().lower()
|
||
return id_to_path.get(id_value)
|
||
if target.startswith("file:"):
|
||
target = target[5:]
|
||
elif "://" in target:
|
||
return None
|
||
elif ":" in target and not target.endswith(".org"):
|
||
return None
|
||
|
||
source_dir = os.path.dirname(source_relative_path)
|
||
if target.startswith("/"):
|
||
candidate = normalize_relative_path(target.lstrip("/"))
|
||
else:
|
||
candidate = normalize_relative_path(os.path.join(source_dir, target))
|
||
|
||
if not candidate:
|
||
return None
|
||
if candidate in known_paths:
|
||
return candidate
|
||
if not candidate.endswith(".org"):
|
||
with_suffix = f"{candidate}.org"
|
||
if with_suffix in known_paths:
|
||
return with_suffix
|
||
return None
|
||
|
||
|
||
def find_backlinks(org_files: Iterable[OrgFile], selected_path: str) -> list[OrgFile]:
|
||
files = list(org_files)
|
||
known_paths = {f.relative_path for f in files}
|
||
id_to_path = {f.file_id.lower(): f.relative_path for f in files if f.file_id}
|
||
backlinks: list[OrgFile] = []
|
||
seen_sources: set[str] = set()
|
||
|
||
for source in files:
|
||
if source.relative_path == selected_path:
|
||
continue
|
||
link_targets = extract_org_link_targets(source.content)
|
||
for raw_target in link_targets:
|
||
resolved = resolve_link_target(source.relative_path, raw_target, known_paths, id_to_path)
|
||
if resolved == selected_path and source.relative_path not in seen_sources:
|
||
backlinks.append(source)
|
||
seen_sources.add(source.relative_path)
|
||
break
|
||
|
||
backlinks.sort(key=lambda f: (f.title.lower(), f.relative_path.lower()))
|
||
return backlinks
|
||
|
||
|
||
def build_backlink_counts(org_files: Iterable[OrgFile]) -> dict[str, int]:
|
||
files = list(org_files)
|
||
known_paths = {f.relative_path for f in files}
|
||
id_to_path = {f.file_id.lower(): f.relative_path for f in files if f.file_id}
|
||
counts = {f.relative_path: 0 for f in files}
|
||
|
||
for source in files:
|
||
seen_targets: set[str] = set()
|
||
for raw_target in extract_org_link_targets(source.content):
|
||
resolved = resolve_link_target(source.relative_path, raw_target, known_paths, id_to_path)
|
||
if not resolved or resolved == source.relative_path or resolved in seen_targets:
|
||
continue
|
||
if resolved in counts:
|
||
counts[resolved] += 1
|
||
seen_targets.add(resolved)
|
||
|
||
return counts
|
||
|
||
|
||
def _timestamp_match_to_sort_key(match: re.Match[str]) -> int | None:
|
||
try:
|
||
year = int(match.group(1))
|
||
month = int(match.group(2))
|
||
day = int(match.group(3))
|
||
hour = int(match.group(4) or "0")
|
||
minute = int(match.group(5) or "0")
|
||
except ValueError:
|
||
return None
|
||
return year * 100000000 + month * 1000000 + day * 10000 + hour * 100 + minute
|
||
|
||
|
||
def extract_created_sort_key(content: str) -> int | None:
|
||
for line in content.splitlines():
|
||
created_match = CREATED_LINE_RE.match(line)
|
||
if created_match:
|
||
ts_match = ORG_TIMESTAMP_RE.search(created_match.group(1))
|
||
if ts_match:
|
||
key = _timestamp_match_to_sort_key(ts_match)
|
||
if key is not None:
|
||
return key
|
||
|
||
first_ts = ORG_TIMESTAMP_RE.search(content)
|
||
if first_ts:
|
||
return _timestamp_match_to_sort_key(first_ts)
|
||
return None
|
||
|
||
|
||
def note_href(relative_path: str, edit_mode: bool) -> str:
|
||
params = {"file": relative_path}
|
||
if edit_mode:
|
||
params["edit"] = "1"
|
||
return "/?" + urlencode(params)
|
||
|
||
|
||
def truncate_label(text: str, max_chars: int = 32) -> str:
|
||
if len(text) <= max_chars:
|
||
return text
|
||
if max_chars <= 3:
|
||
return "." * max_chars
|
||
return text[: max_chars - 3] + "..."
|
||
|
||
|
||
_MONTH_ABBR = [
|
||
"", "Jan", "Feb", "Mar", "Apr", "May", "Jun",
|
||
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
|
||
]
|
||
|
||
|
||
def _format_timestamp(m: re.Match[str]) -> str:
|
||
"""Turn an org timestamp match into a styled <time> element."""
|
||
year = m.group("year")
|
||
month = int(m.group("month"))
|
||
day = int(m.group("day"))
|
||
dayname = m.group("dayname") or ""
|
||
hour = m.group("hour")
|
||
minute = m.group("minute")
|
||
end_hour = m.group("end_hour")
|
||
end_minute = m.group("end_minute")
|
||
is_active = m.group("bracket") == "<"
|
||
|
||
month_str = _MONTH_ABBR[month] if 1 <= month <= 12 else m.group("month")
|
||
date_part = f"{month_str} {day}, {year}"
|
||
if dayname:
|
||
date_part = f"{dayname} {date_part}"
|
||
|
||
time_part = ""
|
||
if hour is not None:
|
||
time_part = f"{hour}:{minute}"
|
||
if end_hour is not None:
|
||
time_part += f"–{end_hour}:{end_minute}"
|
||
|
||
iso_date = f"{year}-{m.group('month')}-{m.group('day')}"
|
||
if hour is not None:
|
||
iso_date += f"T{hour}:{minute}"
|
||
|
||
cls = "org-timestamp" if is_active else "org-timestamp org-timestamp-inactive"
|
||
inner = f"<span class='org-ts-date'>{html.escape(date_part)}</span>"
|
||
if time_part:
|
||
inner += f"<span class='org-ts-time'>{html.escape(time_part)}</span>"
|
||
|
||
return f"<time class='{cls}' datetime='{iso_date}'>{inner}</time>"
|
||
|
||
|
||
_ESCAPED_TS_RE = re.compile(
|
||
r"(?P<bracket>[\x00\[])"
|
||
r"(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})"
|
||
r"(?:\s+(?P<dayname>\w{2,3}))?"
|
||
r"(?:\s+(?P<hour>\d{2}):(?P<minute>\d{2})(?:-(?P<end_hour>\d{2}):(?P<end_minute>\d{2}))?)?"
|
||
r"[\x01\]]"
|
||
)
|
||
|
||
|
||
def _replace_timestamps(escaped_html: str) -> str:
|
||
"""Find org timestamps in already-escaped text and wrap them in <time> tags.
|
||
|
||
Because the text has been HTML-escaped, angle brackets appear as < / >.
|
||
We temporarily swap them to sentinel bytes so the regex can match uniformly.
|
||
"""
|
||
result = escaped_html.replace("<", "\x00").replace(">", "\x01")
|
||
|
||
def _sub(m: re.Match[str]) -> str:
|
||
raw = m.group(0).replace("\x00", "<").replace("\x01", ">")
|
||
ts_match = INLINE_TIMESTAMP_RE.match(raw)
|
||
if ts_match:
|
||
return _format_timestamp(ts_match)
|
||
return m.group(0).replace("\x00", "<").replace("\x01", ">")
|
||
|
||
result = _ESCAPED_TS_RE.sub(_sub, result)
|
||
return result.replace("\x00", "<").replace("\x01", ">")
|
||
|
||
|
||
def render_plain_text_with_links(text: str) -> str:
|
||
rendered_parts: list[str] = []
|
||
cursor = 0
|
||
for match in URL_RE.finditer(text):
|
||
start, end = match.span()
|
||
if start > cursor:
|
||
rendered_parts.append(html.escape(text[cursor:start]))
|
||
url = match.group(0)
|
||
safe_url = html.escape(url, quote=True)
|
||
rendered_parts.append(f"<a class='org-link' href='{safe_url}' target='_blank' rel='noopener noreferrer'>{safe_url}</a>")
|
||
cursor = end
|
||
if cursor < len(text):
|
||
rendered_parts.append(html.escape(text[cursor:]))
|
||
return _replace_timestamps("".join(rendered_parts))
|
||
|
||
|
||
def _render_heading_with_todo(heading_text: str) -> tuple[str, str | None, str, list[str]]:
|
||
"""Parse a heading for TODO keyword and tags.
|
||
|
||
Returns (rendered_html, keyword_or_None, bare_heading_text, tags_list).
|
||
"""
|
||
# Strip tags from the end first
|
||
tag_match = _ORG_TAG_RE.search(heading_text)
|
||
tags: list[str] = []
|
||
text_without_tags = heading_text
|
||
if tag_match:
|
||
tags = [t for t in tag_match.group(1).split(":") if t]
|
||
text_without_tags = heading_text[: tag_match.start()].strip()
|
||
|
||
m = TODO_KEYWORD_RE.match(text_without_tags)
|
||
if m:
|
||
keyword = m.group(1)
|
||
rest = m.group(2).strip()
|
||
css_class = f"org-todo org-todo-{keyword.lower()}"
|
||
rendered = f"<span class='{css_class}'>{html.escape(keyword)}</span> {html.escape(rest)}"
|
||
return rendered, keyword, rest, tags
|
||
return html.escape(text_without_tags), None, text_without_tags.strip(), tags
|
||
|
||
|
||
def render_line_with_links(
|
||
line: str, source_relative_path: str, known_paths: set[str], id_to_path: dict[str, str]
|
||
) -> str:
|
||
rendered_parts: list[str] = []
|
||
cursor = 0
|
||
|
||
for match in ORG_LINK_RE.finditer(line):
|
||
start, end = match.span()
|
||
if start > cursor:
|
||
rendered_parts.append(render_plain_text_with_links(line[cursor:start]))
|
||
|
||
raw_target = match.group(1).strip()
|
||
label = (match.group(2) or "").strip()
|
||
resolved = resolve_link_target(source_relative_path, raw_target, known_paths, id_to_path)
|
||
if resolved:
|
||
safe_href = html.escape(note_href(resolved, False), quote=True)
|
||
link_text = label if label else raw_target
|
||
rendered_parts.append(f"<a class='org-link' href='{safe_href}'>{html.escape(link_text)}</a>")
|
||
else:
|
||
rendered_parts.append(render_plain_text_with_links(match.group(0)))
|
||
|
||
cursor = end
|
||
|
||
if cursor < len(line):
|
||
rendered_parts.append(render_plain_text_with_links(line[cursor:]))
|
||
|
||
return "".join(rendered_parts)
|
||
|
||
|
||
def render_org_to_html(
|
||
content: str, source_relative_path: str, known_paths: set[str], id_to_path: dict[str, str],
|
||
show_webhook: bool = False,
|
||
) -> str:
|
||
"""Very small org-ish renderer: headings become section titles, body keeps line breaks."""
|
||
html_lines: list[str] = []
|
||
in_properties = False
|
||
property_items: list[tuple[str, str]] = []
|
||
|
||
lines = content.splitlines()
|
||
for raw_line in lines:
|
||
line = raw_line.rstrip("\n")
|
||
stripped = line.lstrip()
|
||
|
||
if re.match(r"^\s*:PROPERTIES:\s*$", stripped, re.IGNORECASE):
|
||
in_properties = True
|
||
property_items = []
|
||
continue
|
||
|
||
if in_properties:
|
||
if re.match(r"^\s*:END:\s*$", stripped, re.IGNORECASE):
|
||
in_properties = False
|
||
if property_items:
|
||
dl_items = []
|
||
for key, value in property_items:
|
||
dl_items.append(
|
||
f"<dt>{html.escape(key)}</dt>"
|
||
f"<dd>{html.escape(value)}</dd>"
|
||
)
|
||
html_lines.append(
|
||
"<dl class='org-properties'>" + "".join(dl_items) + "</dl>"
|
||
)
|
||
continue
|
||
prop_match = PROPERTY_LINE_RE.match(stripped)
|
||
if prop_match:
|
||
property_items.append((prop_match.group(1), prop_match.group(2)))
|
||
continue
|
||
|
||
if re.match(r"^\s*#\+filetags:\s", stripped, re.IGNORECASE):
|
||
continue
|
||
|
||
title_match = TITLE_RE.match(stripped)
|
||
if title_match:
|
||
html_lines.append(f"<h1>{html.escape(title_match.group(1))}</h1>")
|
||
continue
|
||
|
||
if stripped.startswith("*"):
|
||
stars = len(stripped) - len(stripped.lstrip("*"))
|
||
if stars > 0 and len(stripped) > stars and stripped[stars] == " ":
|
||
level = min(stars + 1, 6)
|
||
heading_text = stripped[stars + 1 :]
|
||
rendered_title, keyword, bare_text, heading_tags = _render_heading_with_todo(heading_text)
|
||
html_lines.append(f"<h{level}>{rendered_title}</h{level}>")
|
||
if heading_tags:
|
||
tag_spans = "".join(
|
||
f"<span class='org-tag'>{html.escape(t)}</span>"
|
||
for t in heading_tags
|
||
)
|
||
html_lines.append(f"<div class='org-tags'>{tag_spans}</div>")
|
||
if keyword == "TODO" and show_webhook:
|
||
safe_file = html.escape(source_relative_path, quote=True)
|
||
safe_heading = html.escape(bare_text, quote=True)
|
||
html_lines.append(
|
||
f"<button class='webhook-send-btn' type='button' "
|
||
f"data-file='{safe_file}' data-heading='{safe_heading}'>"
|
||
f"Start work</button>"
|
||
)
|
||
continue
|
||
|
||
if not stripped:
|
||
html_lines.append("<div class='spacer'></div>")
|
||
else:
|
||
rendered_line = render_line_with_links(line, source_relative_path, known_paths, id_to_path)
|
||
html_lines.append(f"<p>{rendered_line}</p>")
|
||
|
||
return "\n".join(html_lines)
|
||
|
||
|
||
def find_org_file(org_files: Iterable[OrgFile], relative_path: str | None) -> OrgFile | None:
|
||
if relative_path is None:
|
||
return None
|
||
for org_file in org_files:
|
||
if org_file.relative_path == relative_path:
|
||
return org_file
|
||
return None
|
||
|
||
|
||
_HEADING_RE = re.compile(r"^(\*+)\s+(.*)$")
|
||
_ORG_TAG_RE = re.compile(r"\s+:([\w@:]+):\s*$")
|
||
_DRAWER_RE = re.compile(r"^\s*:(\w+):\s*$")
|
||
_DRAWER_END_RE = re.compile(r"^\s*:END:\s*$", re.IGNORECASE)
|
||
_PROP_KV_RE = re.compile(r"^\s*:([A-Za-z_][\w-]*):\s+(.+?)\s*$")
|
||
_TS_INLINE_RE = re.compile(r"[<\[](\d{4}-\d{2}-\d{2}[^\]>]*)[\]>]")
|
||
_NOTE_TAKEN_RE = re.compile(
|
||
r"^\s*[-+]\s+Note taken on\s+\[([^\]]+)\]\s*(?:\\\\)?\s*$"
|
||
)
|
||
|
||
|
||
def extract_heading_data(content: str, heading_text: str, file_id: str = "") -> dict | None:
|
||
"""Find a heading in *content* by its text and extract webhook-ready data."""
|
||
lines = content.splitlines()
|
||
target_idx: int | None = None
|
||
target_level = 0
|
||
|
||
for idx, raw_line in enumerate(lines):
|
||
hm = _HEADING_RE.match(raw_line)
|
||
if hm:
|
||
stars = len(hm.group(1))
|
||
rest = hm.group(2)
|
||
kw_match = TODO_KEYWORD_RE.match(rest)
|
||
bare_title = kw_match.group(2) if kw_match else rest
|
||
tag_match = _ORG_TAG_RE.search(bare_title)
|
||
bare_title_no_tags = bare_title[: tag_match.start()] if tag_match else bare_title
|
||
if bare_title_no_tags.strip() == heading_text.strip():
|
||
target_idx = idx
|
||
target_level = stars
|
||
break
|
||
|
||
if target_idx is None:
|
||
return None
|
||
|
||
heading_line = lines[target_idx]
|
||
hm = _HEADING_RE.match(heading_line)
|
||
full_rest = hm.group(2) if hm else ""
|
||
|
||
kw_match = TODO_KEYWORD_RE.match(full_rest)
|
||
state = kw_match.group(1) if kw_match else ""
|
||
description = kw_match.group(2) if kw_match else full_rest
|
||
|
||
tag_match = _ORG_TAG_RE.search(description)
|
||
tags: list[str] = []
|
||
if tag_match:
|
||
tags = [t for t in tag_match.group(1).split(":") if t]
|
||
description = description[: tag_match.start()].strip()
|
||
|
||
sub_lines = lines[target_idx + 1 :]
|
||
end = len(sub_lines)
|
||
for i, sl in enumerate(sub_lines):
|
||
sm = _HEADING_RE.match(sl)
|
||
if sm and len(sm.group(1)) <= target_level:
|
||
end = i
|
||
break
|
||
sub_lines = sub_lines[:end]
|
||
|
||
properties: dict[str, str] = {}
|
||
drawers: dict[str, list[str]] = {}
|
||
body_lines: list[str] = []
|
||
timestamps: list[str] = []
|
||
in_drawer: str | None = None
|
||
drawer_lines: list[str] = []
|
||
|
||
for sl in sub_lines:
|
||
stripped = sl.strip()
|
||
if in_drawer is not None:
|
||
if _DRAWER_END_RE.match(stripped):
|
||
if in_drawer == "PROPERTIES":
|
||
for dl in drawer_lines:
|
||
pm = _PROP_KV_RE.match(dl)
|
||
if pm:
|
||
properties[pm.group(1)] = pm.group(2)
|
||
else:
|
||
drawers[in_drawer] = list(drawer_lines)
|
||
in_drawer = None
|
||
drawer_lines = []
|
||
else:
|
||
drawer_lines.append(sl)
|
||
continue
|
||
|
||
dm = _DRAWER_RE.match(stripped)
|
||
if dm:
|
||
in_drawer = dm.group(1).upper()
|
||
drawer_lines = []
|
||
continue
|
||
|
||
for ts in _TS_INLINE_RE.findall(sl):
|
||
timestamps.append(ts)
|
||
|
||
body_lines.append(sl)
|
||
|
||
emacs_id = properties.get("ID", "") or file_id
|
||
notes = _extract_notes(body_lines)
|
||
|
||
return {
|
||
"description": description,
|
||
"labels": tags,
|
||
"state": "STRT",
|
||
"timestamps": timestamps,
|
||
"notes": notes,
|
||
"drawers": drawers,
|
||
"emacs_id": emacs_id,
|
||
"updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||
"source": "orgweb",
|
||
"properties": properties,
|
||
"body": "\n".join(body_lines).strip(),
|
||
}
|
||
|
||
|
||
def _parse_org_ts_to_iso(raw: str) -> str:
|
||
"""Convert an org timestamp body like '2026-02-12 Thu 16:00' to ISO 8601."""
|
||
m = re.match(
|
||
r"(\d{4})-(\d{2})-(\d{2})(?:\s+\w{2,3})?(?:\s+(\d{2}):(\d{2}))?", raw
|
||
)
|
||
if not m:
|
||
return raw.strip()
|
||
parts = [m.group(1), "-", m.group(2), "-", m.group(3)]
|
||
if m.group(4) is not None:
|
||
parts += ["T", m.group(4), ":", m.group(5)]
|
||
return "".join(parts)
|
||
|
||
|
||
def _extract_notes(body_lines: list[str]) -> list[dict[str, str]]:
|
||
"""Extract '- Note taken on [TIMESTAMP]' entries and their content from body lines."""
|
||
notes: list[dict[str, str]] = []
|
||
i = 0
|
||
while i < len(body_lines):
|
||
nm = _NOTE_TAKEN_RE.match(body_lines[i])
|
||
if nm:
|
||
timestamp = _parse_org_ts_to_iso(nm.group(1))
|
||
i += 1
|
||
# skip one optional blank line after the note header
|
||
if i < len(body_lines) and not body_lines[i].strip():
|
||
i += 1
|
||
content_lines: list[str] = []
|
||
while i < len(body_lines):
|
||
if _NOTE_TAKEN_RE.match(body_lines[i]):
|
||
break
|
||
content_lines.append(body_lines[i])
|
||
i += 1
|
||
content = "\n".join(content_lines).strip()
|
||
notes.append({"timestamp": timestamp, "content": content})
|
||
else:
|
||
i += 1
|
||
return notes
|
||
|
||
|
||
def build_index_page(
|
||
org_files: Iterable[OrgFile],
|
||
selected_path: str | None = None,
|
||
edit_mode: bool = False,
|
||
status_message: str | None = None,
|
||
status_level: str = "success",
|
||
show_webhook: bool = False,
|
||
) -> str:
|
||
org_files = list(org_files)
|
||
|
||
if not org_files:
|
||
body = "<p>No .org files were found in this directory.</p>"
|
||
nav = ""
|
||
backlinks_html = "<p class='backlinks-empty'>Open a note to see backlinks.</p>"
|
||
else:
|
||
selected = find_org_file(org_files, selected_path) or org_files[0]
|
||
known_paths = {f.relative_path for f in org_files}
|
||
id_to_path = {f.file_id.lower(): f.relative_path for f in org_files if f.file_id}
|
||
backlinks = find_backlinks(org_files, selected.relative_path)
|
||
backlink_counts = build_backlink_counts(org_files)
|
||
|
||
nav_items = []
|
||
for f in org_files:
|
||
active = "active" if f.relative_path == selected.relative_path else ""
|
||
safe_href = html.escape(note_href(f.relative_path, False), quote=True)
|
||
safe_title = html.escape(truncate_label(f.title))
|
||
safe_path = html.escape(truncate_label(f.relative_path))
|
||
full_title = html.escape(f.title, quote=True)
|
||
full_path = html.escape(f.relative_path, quote=True)
|
||
searchable = html.escape(f"{f.title} {f.relative_path}".lower(), quote=True)
|
||
backlinks_count = backlink_counts.get(f.relative_path, 0)
|
||
created_key = extract_created_sort_key(f.content)
|
||
created_key_attr = "" if created_key is None else str(created_key)
|
||
modified_ts = str(int(f.mtime))
|
||
pin_icon = "\U0001F4CC " if f.pinned else ""
|
||
display_title = f"{pin_icon}TODAY" if f.pinned else safe_title
|
||
pinned_attr = "server" if f.pinned else ""
|
||
nav_items.append(
|
||
f"<a class='file-link {active}' data-search='{searchable}' "
|
||
f"data-backlinks='{backlinks_count}' data-created-ts='{created_key_attr}' "
|
||
f"data-modified-ts='{modified_ts}' data-pinned='{pinned_attr}' "
|
||
f"data-file-path='{full_path}' href='{safe_href}'>"
|
||
f"<span class='file-title' title='{full_title}'>{display_title}</span>"
|
||
f"<span class='file-path' title='{full_path}'>{safe_path}</span>"
|
||
f"<button class='pin-toggle' type='button' title='Pin/unpin this file' "
|
||
f"aria-label='Pin or unpin this file'>📌</button>"
|
||
"</a>"
|
||
)
|
||
|
||
nav = "\n".join(nav_items)
|
||
mode_toggle = (
|
||
f"<a class='mode-link' href='{html.escape(note_href(selected.relative_path, False), quote=True)}'>Preview</a>"
|
||
if edit_mode
|
||
else f"<a class='mode-link' href='{html.escape(note_href(selected.relative_path, True), quote=True)}'>Edit</a>"
|
||
)
|
||
status_html = ""
|
||
if status_message:
|
||
safe_message = html.escape(status_message)
|
||
safe_level = "error" if status_level == "error" else "success"
|
||
status_html = f"<p class='status status-{safe_level}'>{safe_message}</p>"
|
||
|
||
if backlinks:
|
||
backlink_items = []
|
||
for source in backlinks:
|
||
safe_href = html.escape(note_href(source.relative_path, edit_mode), quote=True)
|
||
safe_title = html.escape(truncate_label(source.title))
|
||
safe_path = html.escape(truncate_label(source.relative_path))
|
||
full_title = html.escape(source.title, quote=True)
|
||
full_path = html.escape(source.relative_path, quote=True)
|
||
backlink_items.append(
|
||
f"<a class='backlink-item' href='{safe_href}'>"
|
||
f"<span class='file-title' title='{full_title}'>{safe_title}</span>"
|
||
f"<span class='file-path' title='{full_path}'>{safe_path}</span>"
|
||
"</a>"
|
||
)
|
||
backlinks_html = "\n".join(backlink_items)
|
||
else:
|
||
backlinks_html = "<p class='backlinks-empty'>No notes link to this note yet.</p>"
|
||
|
||
if edit_mode:
|
||
body = (
|
||
f"<h2>Editing {html.escape(selected.relative_path)}</h2>"
|
||
f"<div class='toolbar'>{mode_toggle}</div>"
|
||
f"{status_html}"
|
||
"<form class='editor-form' method='post' action='/edit'>"
|
||
f"<input type='hidden' name='file' value='{html.escape(selected.relative_path, quote=True)}'>"
|
||
f"<textarea class='editor-box' name='content'>{html.escape(selected.content)}</textarea>"
|
||
"<button class='submit-btn' type='submit'>Submit</button>"
|
||
"</form>"
|
||
)
|
||
else:
|
||
body = (
|
||
f"<h2>{html.escape(selected.relative_path)}</h2>"
|
||
f"<div class='toolbar'>{mode_toggle}</div>"
|
||
f"{status_html}"
|
||
f"<article class='org-content'>{render_org_to_html(selected.content, selected.relative_path, known_paths, id_to_path, show_webhook=show_webhook)}</article>"
|
||
)
|
||
|
||
template = TEMPLATE_PATH.read_text(encoding="utf-8")
|
||
return (
|
||
template.replace("{{NAV_ITEMS}}", nav)
|
||
.replace("{{MAIN_CONTENT}}", body)
|
||
.replace("{{BACKLINKS}}", backlinks_html)
|
||
)
|
||
|
||
|
||
def build_backlinks_page(org_files: Iterable[OrgFile], selected_path: str | None = None) -> str:
|
||
"""Return a minimal standalone HTML page containing only the backlinks for a note."""
|
||
org_files = list(org_files)
|
||
if not org_files:
|
||
items_html = "<p>No .org files found.</p>"
|
||
else:
|
||
selected = find_org_file(org_files, selected_path) or org_files[0]
|
||
backlinks = find_backlinks(org_files, selected.relative_path)
|
||
if backlinks:
|
||
backlink_items = []
|
||
for source in backlinks:
|
||
safe_href = html.escape(note_href(source.relative_path, False), quote=True)
|
||
safe_title = html.escape(source.title)
|
||
safe_path = html.escape(source.relative_path)
|
||
full_title = html.escape(source.title, quote=True)
|
||
full_path = html.escape(source.relative_path, quote=True)
|
||
backlink_items.append(
|
||
f"<a class='backlink-item' href='{safe_href}'>"
|
||
f"<span class='file-title' title='{full_title}'>{safe_title}</span>"
|
||
f"<span class='file-path' title='{full_path}'>{safe_path}</span>"
|
||
"</a>"
|
||
)
|
||
items_html = "\n".join(backlink_items)
|
||
else:
|
||
items_html = "<p class='backlinks-empty'>No notes link to this note yet.</p>"
|
||
|
||
return (
|
||
"<!doctype html>\n<html lang='en'>\n<head>\n"
|
||
" <meta charset='utf-8'>\n"
|
||
" <meta name='viewport' content='width=device-width, initial-scale=1'>\n"
|
||
" <title>Backlinks</title>\n"
|
||
" <link rel='stylesheet' href='/static/style.css'>\n"
|
||
"</head>\n<body>\n"
|
||
" <div class='backlinks-pane' style='border:none;height:100vh;'>\n"
|
||
" <h3>Backlinks</h3>\n"
|
||
f" <div class='backlinks-list'>{items_html}</div>\n"
|
||
" </div>\n"
|
||
" <script>(function(){"
|
||
"var t=new URLSearchParams(window.location.search).get('theme');"
|
||
"if(t==='dark')document.body.setAttribute('data-theme','dark');"
|
||
"})()</script>\n"
|
||
"</body>\n</html>"
|
||
)
|
||
|
||
|
||
def make_handler(base_dir: Path, webhook_url: str = "", webhook_token: str = ""):
|
||
class OrgRequestHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self) -> None:
|
||
parsed = urlparse(self.path)
|
||
|
||
if parsed.path.startswith("/static/"):
|
||
self.serve_static(parsed.path)
|
||
return
|
||
|
||
if parsed.path != "/":
|
||
self.send_error(404, "Not found")
|
||
return
|
||
|
||
params = parse_qs(parsed.query)
|
||
selected_path = params.get("file", [None])[0]
|
||
embed_backlinks = params.get("embed_backlinks", ["0"])[0] in ("1", "true", "yes")
|
||
|
||
if embed_backlinks:
|
||
org_files = scan_org_files(base_dir)
|
||
html_page = build_backlinks_page(org_files, selected_path=selected_path)
|
||
encoded = html_page.encode("utf-8")
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(encoded)))
|
||
self.end_headers()
|
||
self.wfile.write(encoded)
|
||
return
|
||
|
||
edit_mode = params.get("edit", ["0"])[0] == "1"
|
||
saved = params.get("saved", ["0"])[0] == "1"
|
||
error = params.get("error", [""])[0]
|
||
status_message = None
|
||
status_level = "success"
|
||
if saved:
|
||
status_message = "Saved successfully."
|
||
elif error == "missing":
|
||
status_message = "Select a valid .org file first."
|
||
status_level = "error"
|
||
elif error == "write":
|
||
status_message = "Could not write this file."
|
||
status_level = "error"
|
||
|
||
org_files = scan_org_files(base_dir)
|
||
html_page = build_index_page(
|
||
org_files,
|
||
selected_path=selected_path,
|
||
edit_mode=edit_mode,
|
||
status_message=status_message,
|
||
status_level=status_level,
|
||
show_webhook=bool(webhook_url and webhook_token),
|
||
)
|
||
encoded = html_page.encode("utf-8")
|
||
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(encoded)))
|
||
self.end_headers()
|
||
self.wfile.write(encoded)
|
||
|
||
def do_POST(self) -> None:
|
||
parsed = urlparse(self.path)
|
||
if parsed.path == "/webhook":
|
||
self.handle_webhook()
|
||
return
|
||
if parsed.path != "/edit":
|
||
self.send_error(404, "Not found")
|
||
return
|
||
|
||
content_length = int(self.headers.get("Content-Length", "0"))
|
||
body = self.rfile.read(content_length).decode("utf-8", errors="replace")
|
||
params = parse_qs(body)
|
||
selected_path = params.get("file", [None])[0]
|
||
new_content = params.get("content", [""])[0]
|
||
|
||
org_files = scan_org_files(base_dir)
|
||
selected = find_org_file(org_files, selected_path)
|
||
if selected is None:
|
||
self.redirect_with_query("/", {"edit": "1", "error": "missing"})
|
||
return
|
||
|
||
try:
|
||
selected.path.write_text(new_content, encoding="utf-8")
|
||
except OSError:
|
||
self.redirect_with_query("/", {"file": selected.relative_path, "edit": "1", "error": "write"})
|
||
return
|
||
|
||
self.redirect_with_query("/", {"file": selected.relative_path, "edit": "1", "saved": "1"})
|
||
|
||
def handle_webhook(self) -> None:
|
||
if not webhook_url:
|
||
self.send_json_response(400, {"error": "No webhook_url configured"})
|
||
return
|
||
|
||
content_length = int(self.headers.get("Content-Length", "0"))
|
||
body = self.rfile.read(content_length).decode("utf-8", errors="replace")
|
||
try:
|
||
payload = json.loads(body)
|
||
except json.JSONDecodeError:
|
||
self.send_json_response(400, {"error": "Invalid JSON"})
|
||
return
|
||
|
||
file_path = payload.get("file", "")
|
||
heading_text = payload.get("heading", "")
|
||
if not file_path or not heading_text:
|
||
self.send_json_response(400, {"error": "Missing file or heading"})
|
||
return
|
||
|
||
org_files = scan_org_files(base_dir)
|
||
org_file = find_org_file(org_files, file_path)
|
||
if org_file is None:
|
||
self.send_json_response(404, {"error": "File not found"})
|
||
return
|
||
|
||
data = extract_heading_data(org_file.content, heading_text, file_id=org_file.file_id or "")
|
||
if data is None:
|
||
self.send_json_response(404, {"error": "Heading not found"})
|
||
return
|
||
|
||
print(json.dumps(data))
|
||
encoded_data = json.dumps(data).encode("utf-8")
|
||
headers = {"Content-Type": "application/json"}
|
||
if webhook_token:
|
||
headers["Authorization"] = f"Token {webhook_token}"
|
||
|
||
try:
|
||
req = Request(webhook_url, data=encoded_data, headers=headers, method="POST")
|
||
with urlopen(req, timeout=10) as resp:
|
||
resp_body = resp.read().decode("utf-8", errors="replace")
|
||
self.send_json_response(200, {"ok": True, "response": resp_body})
|
||
except Exception as exc:
|
||
self.send_json_response(502, {"error": f"Webhook failed: {exc}"})
|
||
|
||
def send_json_response(self, status: int, data: dict) -> None:
|
||
encoded = json.dumps(data).encode("utf-8")
|
||
self.send_response(status)
|
||
self.send_header("Content-Type", "application/json")
|
||
self.send_header("Content-Length", str(len(encoded)))
|
||
self.end_headers()
|
||
self.wfile.write(encoded)
|
||
|
||
def redirect_with_query(self, path: str, params: dict[str, str]) -> None:
|
||
location = f"{path}?{urlencode(params)}"
|
||
self.send_response(303)
|
||
self.send_header("Location", location)
|
||
self.send_header("Content-Length", "0")
|
||
self.end_headers()
|
||
|
||
def serve_static(self, request_path: str) -> None:
|
||
rel_path = request_path[len("/static/") :]
|
||
candidate = (STATIC_DIR / rel_path).resolve()
|
||
if STATIC_DIR.resolve() not in candidate.parents and candidate != STATIC_DIR.resolve():
|
||
self.send_error(403, "Forbidden")
|
||
return
|
||
if not candidate.is_file():
|
||
self.send_error(404, "Not found")
|
||
return
|
||
|
||
data = candidate.read_bytes()
|
||
content_type = mimetypes.guess_type(str(candidate))[0] or "application/octet-stream"
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", content_type)
|
||
self.send_header("Content-Length", str(len(data)))
|
||
self.end_headers()
|
||
self.wfile.write(data)
|
||
|
||
def log_message(self, fmt: str, *args) -> None:
|
||
return
|
||
|
||
return OrgRequestHandler
|
||
|
||
|
||
def serve(base_dir: Path, host: str, port: int, open_browser: bool = True,
|
||
webhook_url: str = "", webhook_token: str = "") -> None:
|
||
handler_cls = make_handler(base_dir, webhook_url=webhook_url, webhook_token=webhook_token)
|
||
|
||
class ReusableHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
|
||
daemon_threads = True
|
||
allow_reuse_address = True
|
||
|
||
server = ReusableHTTPServer((host, port), handler_cls)
|
||
actual_port = server.server_address[1]
|
||
url = f"http://{host}:{actual_port}/"
|
||
print(f"Serving org files from {base_dir}")
|
||
print(f"Open {url}")
|
||
|
||
if open_browser:
|
||
threading.Timer(0.4, lambda: webbrowser.open(url)).start()
|
||
|
||
try:
|
||
server.serve_forever()
|
||
except KeyboardInterrupt:
|
||
print("\nShutting down.")
|
||
finally:
|
||
server.server_close()
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description="Serve local .org files in a browser-friendly web view.")
|
||
parser.add_argument(
|
||
"--config",
|
||
default=str(DEFAULT_CONFIG_PATH),
|
||
help=f"Path to config YAML file (default: {DEFAULT_CONFIG_PATH})",
|
||
)
|
||
parser.add_argument("--dir", default="notes", help="Directory to scan for .org files (default: notes)")
|
||
parser.add_argument(
|
||
"--host",
|
||
default=None,
|
||
help="Host/IP to bind the web server (overrides config bind_addr)",
|
||
)
|
||
parser.add_argument(
|
||
"--port",
|
||
type=int,
|
||
default=None,
|
||
help="Port to bind (overrides config bind_port)",
|
||
)
|
||
parser.add_argument("--no-browser", action="store_true", help="Don't auto-open the browser")
|
||
return parser.parse_args()
|
||
|
||
|
||
def _strip_quotes(value: str) -> str:
|
||
if len(value) >= 2 and ((value[0] == value[-1] == "'") or (value[0] == value[-1] == '"')):
|
||
return value[1:-1]
|
||
return value
|
||
|
||
|
||
def load_runtime_config(config_path: Path) -> dict[str, str | int]:
|
||
config: dict[str, str] = {}
|
||
if config_path.exists():
|
||
text = config_path.read_text(encoding="utf-8")
|
||
for raw_line in text.splitlines():
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if ":" not in line:
|
||
continue
|
||
key, value = line.split(":", 1)
|
||
key = key.strip()
|
||
value = value.strip()
|
||
if not key:
|
||
continue
|
||
if "#" in value:
|
||
value = value.split("#", 1)[0].strip()
|
||
config[key] = _strip_quotes(value)
|
||
|
||
bind_addr = config.get("bind_addr", "127.0.0.1")
|
||
bind_port_raw = config.get("bind_port", "8000")
|
||
try:
|
||
bind_port = int(bind_port_raw)
|
||
except ValueError as exc:
|
||
raise ValueError(f"Invalid bind_port in {config_path}: {bind_port_raw!r}") from exc
|
||
if not 1 <= bind_port <= 65535:
|
||
raise ValueError(f"bind_port out of range in {config_path}: {bind_port}")
|
||
|
||
webhook_url = os.environ.get("ORGWEB_WEBHOOK_URL", "")
|
||
webhook_token = os.environ.get("ORGWEB_WEBHOOK_TOKEN", "")
|
||
|
||
return {
|
||
"bind_addr": bind_addr,
|
||
"bind_port": bind_port,
|
||
"webhook_url": webhook_url,
|
||
"webhook_token": webhook_token,
|
||
}
|
||
|
||
|
||
def main() -> None:
|
||
args = parse_args()
|
||
config = load_runtime_config(Path(args.config))
|
||
base_dir = Path(args.dir).resolve()
|
||
host = args.host if args.host is not None else str(config["bind_addr"])
|
||
port = args.port if args.port is not None else int(config["bind_port"])
|
||
serve(
|
||
base_dir, host, port,
|
||
open_browser=not args.no_browser,
|
||
webhook_url=str(config.get("webhook_url", "")),
|
||
webhook_token=str(config.get("webhook_token", "")),
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|