Files
openclaw-workspace/tools/proton_read_email.py
2026-04-11 09:45:12 -05:00

239 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""Read emails from Proton Mail INBOX - Full parsing"""
import socket
import email
from email import policy
from email.parser import BytesParser
from html.parser import HTMLParser
IMAP_HOST = "127.0.0.1"
IMAP_PORT = 1143
USERNAME = "alexthenerdyai@proton.me"
PASSWORD = "8yiNBTJBMc6HyOQjIZKjMw"
def strip_html(html):
"""Convert HTML to clean plain text"""
import re
# Remove style/script tags and their contents
html = re.sub(r'<(style|script)[^>]*>[^<]*</\1>', ' ', html, flags=re.IGNORECASE | re.DOTALL)
# Replace common block elements with newlines
html = re.sub(r'</(p|div|h[1-6]|li|tr|br)>', '\n', html, flags=re.IGNORECASE)
# Replace table cells with tabs
html = re.sub(r'</(td|th)>', '\t', html, flags=re.IGNORECASE)
# Remove all remaining HTML tags
html = re.sub(r'<[^>]+>', ' ', html)
# Remove CSS @media queries and blocks
html = re.sub(r'@media[^{]*\{[^}]*\}', ' ', html)
# Clean up whitespace
html = re.sub(r'[\t ]+', ' ', html)
html = re.sub(r'\n[\n ]+', '\n\n', html)
# Decode common HTML entities
html = html.replace('&nbsp;', ' ')
html = html.replace('&lt;', '<')
html = html.replace('&gt;', '>')
html = html.replace('&amp;', '&')
html = html.replace('&quot;', '"')
html = html.replace('&#8217;', "'")
html = html.replace('&#8216;', "'")
html = html.replace('&#8220;', '"')
html = html.replace('&#8221;', '"')
html = html.replace('&#8230;', '...')
return html.strip()
def get_response(sock, tag):
"""Read response until we see our tag"""
response = b""
while True:
chunk = sock.recv(16384)
if not chunk:
break
response += chunk
if tag.encode() in response:
break
return response
def extract_rfc822(data):
"""Extract RFC822 email from IMAP FETCH response"""
# Find the start of the email data
start = data.find(b'\r\n')
if start == -1:
return b''
# Find the end (A# OK)
lines = data.split(b'\r\n')
email_lines = []
collect = False
for line in lines:
# Start collecting after the FETCH response line
if b'FETCH' in line and b'RFC822' in line:
collect = True
continue
# Stop at the OK line
if line.startswith((b'A4 ', b'A5 ', b'A6 ')) and b'OK' in line:
break
if collect:
email_lines.append(line)
return b'\r\n'.join(email_lines)
def parse_email_body(msg):
"""Extract plain text body from email message"""
body_parts = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Skip attachments
if "attachment" in content_disposition:
continue
payload = part.get_payload(decode=True)
if not payload:
continue
charset = part.get_content_charset() or 'utf-8'
try:
text = payload.decode(charset, errors='replace')
except:
text = payload.decode('utf-8', errors='replace')
if content_type == "text/plain":
body_parts.append(("text", text))
elif content_type == "text/html":
plain = strip_html(text)
body_parts.append(("html", plain))
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
try:
text = payload.decode(charset, errors='replace')
except:
text = payload.decode('utf-8', errors='replace')
content_type = msg.get_content_type()
if content_type == "text/html":
text = strip_html(text)
body_parts.append(("text", text))
return body_parts
def format_email(msg):
"""Format email for display"""
output = []
output.append("=" * 60)
# Headers
subject = msg['Subject'] or '(No Subject)'
from_addr = msg['From'] or '(Unknown)'
to_addr = msg['To'] or '(Unknown)'
date = msg['Date'] or '(No Date)'
output.append(f"FROM: {from_addr}")
output.append(f"TO: {to_addr}")
output.append(f"DATE: {date}")
output.append(f"SUBJECT: {subject}")
output.append("=" * 60)
# Body
body_parts = parse_email_body(msg)
if body_parts:
# Prefer plain text, but use HTML if that's all we have
text_body = None
for part_type, text in body_parts:
if part_type == "text":
text_body = text
break
if not text_body and body_parts:
text_body = body_parts[0][1]
if text_body:
output.append("\nBODY:")
output.append("-" * 60)
# Clean up whitespace
text_body = '\n'.join(line.strip() for line in text_body.split('\n'))
text_body = '\n'.join(filter(None, text_body.split('\n')))
output.append(text_body[:2000]) # Limit length
if len(text_body) > 2000:
output.append("\n[... message truncated ...]")
else:
output.append("\n[No readable body content]")
return '\n'.join(output)
def read_message(sock, msg_num):
"""Fetch and display a message"""
sock.send(f'A4 FETCH {msg_num} RFC822\r\n'.encode())
resp = get_response(sock, "A4 OK")
email_data = extract_rfc822(resp)
if not email_data:
print(f"[ERR] Could not extract message {msg_num}")
return
try:
msg = BytesParser(policy=policy.default).parsebytes(email_data)
print(format_email(msg))
except Exception as e:
print(f"[ERR] Failed to parse: {e}")
print("RAW:", email_data[:500])
def main():
print("Proton Mail - Email Reader")
print("=" * 60)
# Connect
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(15)
sock.connect((IMAP_HOST, IMAP_PORT))
sock.recv(1024) # Greeting
# Login
sock.send(f'A1 LOGIN "{USERNAME}" "{PASSWORD}"\r\n'.encode())
get_response(sock, "A1 OK")
print("[LOGIN] Success\n")
# Select INBOX
sock.send(b'A2 SELECT "INBOX"\r\n')
get_response(sock, "A2 OK")
# Get message list
sock.send(b'A3 SEARCH ALL\r\n')
resp = get_response(sock, "A3 OK").decode()
msg_nums = []
for line in resp.split('\r\n'):
if 'SEARCH' in line and '*' in line:
parts = line.split('SEARCH')
if len(parts) > 1:
msg_nums = [n for n in parts[1].strip().split() if n.isdigit()]
print(f"[INFO] {len(msg_nums)} messages in INBOX\n")
# Read oldest (first) message
if msg_nums:
read_message(sock, msg_nums[0])
# Logout
sock.send(b'A5 LOGOUT\r\n')
sock.close()
print("\n" + "=" * 60)
print("[DONE]")
if __name__ == "__main__":
main()