[bin] Got things mostly working in checkworkmail.py

This commit is contained in:
2025-07-24 01:41:56 -04:00
parent 3eb9fd0c9f
commit a4f29b3048
2 changed files with 96 additions and 28 deletions

View File

@ -11,74 +11,142 @@
import imaplib
import email
from email.header import decode_header
from bs4 import BeautifulSoup
import requests
import time
import os
import re
# ----------------------
# CONFIGURATION
# ----------------------
GMAIL_USER = os.getenv("GMAIL_USER", "your-email@gmail.com")
GMAIL_APP_PASSWORD = os.getenv("GMAIL_APP_PASSWORD", "") # Replace or use env vars
NTFY_TOPIC = os.getenv("NTFY_TOPIC", "KKddGQxVm2LoP0JF")
NTFY_SERVER = os.getenv("NTFY_SERVER", "https://ntfy.unbl.ink")
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) # seconds
GMAIL_APP_PASSWORD = os.getenv("GMAIL_APP_PASSWORD", "your-app-password")
NTFY_TOPIC = os.getenv("NTFY_TOPIC", "dev-notifications")
NTFY_SERVER = os.getenv("NTFY_SERVER", "https://ntfy.sh")
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60"))
JIRA_SENDERS = ["jira@yourcompany.com", "jira@atlassian.net"]
GITHUB_SENDER = "notifications@github.com"
JIRA_BASE_URL = os.getenv("JIRA_BASE_URL", "https://yourcompany.atlassian.net/browse")
# ----------------------
# HELPERS
# ----------------------
def sanitize_header(value: str) -> str:
return re.sub(r'[\r\n]+', ' ', value).strip()
def clean_subject(subject):
"""Decode and clean up the email subject"""
decoded, encoding = decode_header(subject)[0]
if isinstance(decoded, bytes):
decoded = decoded.decode(encoding or 'utf-8', errors='ignore')
# Remove problematic characters for HTTP headers
return decoded.replace('\r', '').replace('\n', '').strip()
return sanitize_header(decoded)
def extract_body(msg):
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
return part.get_payload(decode=True).decode(errors="ignore")
for part in msg.walk():
if part.get_content_type() == "text/html":
html = part.get_payload(decode=True).decode(errors="ignore")
return BeautifulSoup(html, "html.parser").get_text()
else:
payload = msg.get_payload(decode=True).decode(errors="ignore")
if msg.get_content_type() == "text/html":
return BeautifulSoup(payload, "html.parser").get_text()
return payload
return "(No readable body content found)"
def extract_github_link(subject, body):
pr_match = re.search(r'\(PR\s+#(\d+)\)', subject)
repo_match = re.search(r'\[([^\]]+)\]', subject) # [owner/repo]
if pr_match and repo_match:
return f"https://github.com/{repo_match.group(1)}/pull/{pr_match.group(1)}"
match = re.search(r'https://github\.com/\S+', body)
return match.group(0) if match else None
def extract_jira_link(subject):
ticket_match = re.search(r'\b([A-Z]+-\d+)\b', subject)
if ticket_match:
return f"{JIRA_BASE_URL}/{ticket_match.group(1)}"
return None
def format_message(source, subject, body, link=None):
return body.strip() or ""
return "\n".join(lines)
def send_ntfy_notification(title, message):
url = f"{NTFY_SERVER.rstrip('/')}/{NTFY_TOPIC}"
requests.post(url, data=message.encode("utf-8"), headers={"Title": title})
requests.post(url, data=message.encode("utf-8"), headers={"Title": sanitize_header(title)})
def check_github_notifications():
print("Connecting to Gmail...")
def is_github_email(from_email):
return GITHUB_SENDER in from_email.lower()
def is_jira_email(from_email, subject):
return any(s in from_email.lower() for s in JIRA_SENDERS) or re.search(r'[A-Z]+-\d+', subject)
def archive_message(mail, email_id):
try:
if isinstance(email_id, bytes):
email_id = email_id.decode()
# Properly remove the \Inbox label (archives the email)
# X-GM-LABELS is a Gmail IMAP extension
mail.store(email_id, '-X-GM-LABELS', r'(\Inbox)')
print(f"Archived message {email_id}")
except Exception as e:
print(f"Error archiving message {email_id}: {e}")
# ----------------------
# MAIN LOOP
# ----------------------
def check_notifications():
print("Checking inbox...")
mail = imaplib.IMAP4_SSL("imap.gmail.com")
mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
mail.select("inbox")
result, data = mail.search(None, '(UNSEEN FROM "notifications@github.com")')
result, data = mail.search(None, '(UNSEEN)')
mail_ids = data[0].split()
if not mail_ids:
print("No new GitHub notifications.")
print("No new notifications.")
mail.logout()
return
print(f"Found {len(mail_ids)} new GitHub notifications")
for num in mail_ids:
result, msg_data = mail.fetch(num, '(RFC822)')
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = clean_subject(msg["Subject"])
from_email = msg.get("From")
subject = clean_subject(msg.get("Subject", "No Subject"))
from_email = msg.get("From", "")
body = extract_body(msg)
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode(errors="ignore")
break
else:
body = "(No plain text content found)"
if is_github_email(from_email):
source = "GitHub"
link = extract_github_link(subject, body)
elif is_jira_email(from_email, subject):
source = "Jira"
link = extract_jira_link(subject)
else:
body = msg.get_payload(decode=True).decode(errors="ignore")
print(f"Skipping non-matching email: {subject}")
continue
print(f"Sending: {subject.replace('\n', '').replace('\r', '').strip()}")
send_ntfy_notification(subject, body[:300])
message = format_message(source, subject, body, link)
print(f"Sending {source} notification: {subject}")
send_ntfy_notification(subject, message)
archive_message(mail, num)
mail.logout()
# ----------------------
# RUNNER
# ----------------------
if __name__ == "__main__":
while True:
try:
check_github_notifications()
check_notifications()
except Exception as e:
print(f"Error: {e}")
time.sleep(CHECK_INTERVAL)

View File

@ -389,7 +389,7 @@ Returns the ID string."
:type "POST"
:headers '(("Content-Type" . "application/json"))
:data (json-encode data)
:headers '(("Authorization" . "Token 27a4bde480a982e4e0bc74e9d74d052f071b1737")
:headers '(("Authorization" . "Token 58e898c0e88bd6333b1a9e8de82e81f36c4b64e")
("Content-Type" . "application/json"))
:parser 'json-read
:success (cl-function