[bin] Add new work check email script
This commit is contained in:
84
bin/.bin/checkworkmail.py
Executable file
84
bin/.bin/checkworkmail.py
Executable file
@ -0,0 +1,84 @@
|
||||
#!/usr/bin/env -S uv run --script
|
||||
#
|
||||
# /// script
|
||||
# dependencies = [
|
||||
# "requests",
|
||||
# "beautifulsoup4",
|
||||
# "imaplib2",
|
||||
# ]
|
||||
# ///
|
||||
|
||||
import imaplib
|
||||
import email
|
||||
from email.header import decode_header
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
|
||||
# ----------------------
|
||||
# CONFIGURATION
|
||||
# ----------------------
|
||||
GMAIL_USER = os.getenv("GMAIL_USER", "your-email@gmail.com")
|
||||
GMAIL_APP_PASSWORD = os.getenv("GMAIL_APP_PASSWORD", "") # Replace or use env vars
|
||||
NTFY_TOPIC = os.getenv("NTFY_TOPIC", "KKddGQxVm2LoP0JF")
|
||||
NTFY_SERVER = os.getenv("NTFY_SERVER", "https://ntfy.unbl.ink")
|
||||
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "60")) # seconds
|
||||
|
||||
def clean_subject(subject):
|
||||
"""Decode and clean up the email subject"""
|
||||
decoded, encoding = decode_header(subject)[0]
|
||||
if isinstance(decoded, bytes):
|
||||
decoded = decoded.decode(encoding or 'utf-8', errors='ignore')
|
||||
# Remove problematic characters for HTTP headers
|
||||
return decoded.replace('\r', '').replace('\n', '').strip()
|
||||
|
||||
def send_ntfy_notification(title, message):
|
||||
url = f"{NTFY_SERVER.rstrip('/')}/{NTFY_TOPIC}"
|
||||
requests.post(url, data=message.encode("utf-8"), headers={"Title": title})
|
||||
|
||||
def check_github_notifications():
|
||||
print("Connecting to Gmail...")
|
||||
mail = imaplib.IMAP4_SSL("imap.gmail.com")
|
||||
mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
|
||||
mail.select("inbox")
|
||||
|
||||
result, data = mail.search(None, '(UNSEEN FROM "notifications@github.com")')
|
||||
mail_ids = data[0].split()
|
||||
|
||||
if not mail_ids:
|
||||
print("No new GitHub notifications.")
|
||||
mail.logout()
|
||||
return
|
||||
|
||||
print(f"Found {len(mail_ids)} new GitHub notifications")
|
||||
|
||||
for num in mail_ids:
|
||||
result, msg_data = mail.fetch(num, '(RFC822)')
|
||||
raw_email = msg_data[0][1]
|
||||
msg = email.message_from_bytes(raw_email)
|
||||
|
||||
subject = clean_subject(msg["Subject"])
|
||||
from_email = msg.get("From")
|
||||
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
if part.get_content_type() == "text/plain":
|
||||
body = part.get_payload(decode=True).decode(errors="ignore")
|
||||
break
|
||||
else:
|
||||
body = "(No plain text content found)"
|
||||
else:
|
||||
body = msg.get_payload(decode=True).decode(errors="ignore")
|
||||
|
||||
print(f"Sending: {subject.replace('\n', '').replace('\r', '').strip()}")
|
||||
send_ntfy_notification(subject, body[:300])
|
||||
|
||||
mail.logout()
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
try:
|
||||
check_github_notifications()
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
time.sleep(CHECK_INTERVAL)
|
||||
Reference in New Issue
Block a user