#!/usr/bin/env python3 """Shipping Tracker - Checks Proton Mail for tracking emails""" import socket import sqlite3 import re import json import os from email import policy from email.parser import BytesParser from datetime import datetime from urllib.parse import urlparse # Config IMAP_HOST = "127.0.0.1" IMAP_PORT = 1143 USERNAME = "alexthenerdyai@proton.me" PASSWORD = "8yiNBTJBMc6HyOQjIZKjMw" DB_PATH = os.path.expanduser("~/.openclaw/workspace/data/shipping.db") CHANNEL_ID = "1473701182076752135" # Tracking patterns CARRIERS = { 'ups': { 'patterns': [r'1Z[A-Z0-9]{16}', r'\b1Z[0-9A-Z]{16}\b'], 'domains': ['ups.com', 'email.ups.com'], 'name': 'UPS' }, 'fedex': { 'patterns': [r'\b\d{12}\b', r'\b\d{14}\b', r'\b\d{20}\b'], 'domains': ['fedex.com', 'email.fedex.com'], 'name': 'FedEx' }, 'usps': { 'patterns': [r'\b\d{20,22}\b', r'\b9\d{15,21}\b', r'\b[A-Z]{2}\d{9}[A-Z]{2}\b'], 'domains': ['usps.com', 'email.usps.gov'], 'name': 'USPS' }, 'dhl': { 'patterns': [r'\b\d{10}\b', r'\b\d{11}\b'], 'domains': ['dhl.com', 'dhl-usa.com'], 'name': 'DHL' }, 'amazon': { 'patterns': [r'\b\d{12,14}\b'], 'domains': ['amazon.com', 'amazon.ca', 'amazon.co.uk'], 'name': 'Amazon Logistics' }, 'ontrac': { 'patterns': [r'\bC\d{14}\b', r'\bD\d{14}\b'], 'domains': ['ontrac.com'], 'name': 'OnTrac' }, 'lasership': { 'patterns': [r'\b1LS\d{12}\b', r'\bLX\d{8,12}\b'], 'domains': ['lasership.com'], 'name': 'LaserShip' } } # Date patterns for estimated delivery DATE_PATTERNS = [ r'(?:arriving|delivery|delivered by|estimated delivery)[\s:]*(?:on|by)?\s*(monday|tuesday|wednesday|thursday|friday|saturday|sunday)?[,\s]*([a-z]+)\s+(\d{1,2})[,\s]+(\d{4})', r'(?:arrives|delivery on|by)\s+(\w+)\s+(\d{1,2})', r'(?:expected|estimated|delivery)[\s:]*(\d{1,2})[\/\-](\d{1,2})[\/\-](\d{2,4})', r'(?:will arrive|delivered)\s+(?:on\s+)?(\w+day),?\s*(\w+)\s*(\d{1,2})', ] def init_db(): """Initialize SQLite database""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.execute(''' CREATE TABLE IF NOT EXISTS shipments ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT UNIQUE, sender TEXT, sender_email TEXT, subject TEXT, carrier TEXT, tracking_number TEXT, estimated_delivery TEXT, tracking_url TEXT, received_at TEXT, notified_at TEXT, delivered INTEGER DEFAULT 0 ) ''') conn.commit() conn.close() def get_response(sock, tag): """Read IMAP response until tag""" response = b"" while True: chunk = sock.recv(16384) if not chunk: break response += chunk if tag.encode() in response: break return response def extract_rfc822(data): """Extract email from IMAP FETCH response""" lines = data.split(b'\r\n') email_lines = [] collect = False for line in lines: if b'FETCH' in line and b'RFC822' in line: collect = True continue if line.startswith((b'A4 ', b'A5 ', b'A6 ', b'A7 ')) and b'OK' in line: break if collect: email_lines.append(line) return b'\r\n'.join(email_lines) def strip_html(html): """Convert HTML to plain text""" import re html = re.sub(r'<(style|script)[^>]*>[^<]*', ' ', html, flags=re.IGNORECASE | re.DOTALL) html = re.sub(r'', '\n', html, flags=re.IGNORECASE) html = re.sub(r'<[^>]+>', ' ', html) html = re.sub(r'\s+', ' ', html) return html.strip() def parse_email(msg): """Extract email content""" body = '' if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == 'text/plain': payload = part.get_payload(decode=True) if payload: body += payload.decode('utf-8', errors='ignore') elif content_type == 'text/html': payload = part.get_payload(decode=True) if payload: body += strip_html(payload.decode('utf-8', errors='ignore')) else: payload = msg.get_payload(decode=True) if payload: body = payload.decode('utf-8', errors='ignore') return body def detect_carrier(from_email, body, subject): """Detect carrier from email domain and content""" domain = from_email.split('@')[-1].lower() if '@' in from_email else '' text = f"{subject} {body}".lower() for carrier_id, info in CARRIERS.items(): # Check domain if any(d in domain for d in info['domains']): return carrier_id # Check for carrier name in text if carrier_id in text or info['name'].lower() in text: return carrier_id return None def extract_tracking(body, carrier_id): """Extract tracking number from email body""" if not carrier_id or carrier_id not in CARRIERS: # Try all patterns for cid, info in CARRIERS.items(): for pattern in info['patterns']: match = re.search(pattern, body, re.IGNORECASE) if match: return match.group(0), cid return None, None # Use carrier-specific patterns for pattern in CARRIERS[carrier_id]['patterns']: match = re.search(pattern, body, re.IGNORECASE) if match: return match.group(0), carrier_id return None, carrier_id def extract_delivery_date(body): """Extract estimated delivery date from email""" text = body.lower() months = ['january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december', 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec'] for pattern in DATE_PATTERNS: match = re.search(pattern, text) if match: return match.group(0).strip() return None def get_tracking_url(carrier, tracking_num): """Generate tracking URL for carrier""" urls = { 'ups': f'https://www.ups.com/track?tracknum={tracking_num}', 'fedex': f'https://www.fedex.com/fedextrack/?trknbr={tracking_num}', 'usps': f'https://tools.usps.com/go/TrackConfirmAction?tLabels={tracking_num}', 'dhl': f'https://www.dhl.com/en/express/tracking.html?AWB={tracking_num}', 'amazon': f'https://track.amazon.com/tracking/{tracking_num}', 'ontrac': f'https://www.ontrac.com/tracking/?number={tracking_num}', 'lasership': f'https://www.lasership.com/track/{tracking_num}' } return urls.get(carrier, None) def process_message(sock, msg_num, conn): """Process a single email message""" # Fetch full message sock.send(f'A4 FETCH {msg_num} RFC822\r\n'.encode()) resp = get_response(sock, "A4 OK") email_data = extract_rfc822(resp) if not email_data: return None try: msg = BytesParser(policy=policy.default).parsebytes(email_data) except: return None message_id = msg.get('Message-ID', '').strip('<>') if not message_id: return None # Check if already processed cursor = conn.execute('SELECT 1 FROM shipments WHERE message_id = ?', (message_id,)) if cursor.fetchone(): return None # Parse email sender_email = msg.get('From', '') sender_name = sender_email.split('<')[0].strip() if '<' in sender_email else sender_email subject = msg.get('Subject', '(No Subject)') body = parse_email(msg) # Detect carrier and tracking carrier_id = detect_carrier(sender_email, body, subject) tracking_num, carrier_id = extract_tracking(body, carrier_id) if not tracking_num: return None est_delivery = extract_delivery_date(body) tracking_url = get_tracking_url(carrier_id, tracking_num) # Save to database carrier_name = CARRIERS.get(carrier_id, {}).get('name', carrier_id.upper()) conn.execute(''' INSERT INTO shipments (message_id, sender, sender_email, subject, carrier, tracking_number, estimated_delivery, tracking_url, received_at, notified_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( message_id, sender_name, sender_email, subject, carrier_name, tracking_num, est_delivery, tracking_url, datetime.now().isoformat(), datetime.now().isoformat() )) conn.commit() return { 'sender': sender_name, 'carrier': carrier_name, 'tracking': tracking_num, 'estimated': est_delivery or 'Unknown', 'url': tracking_url, 'subject': subject } def format_notification(shipment): """Format shipment for Discord notification""" lines = [ f"\ud83d\ude9a **New Shipment Detected**", f"", f"**From:** {shipment['sender']}", f"**Carrier:** {shipment['carrier']}", f"**Tracking:** `{shipment['tracking']}`", f"**Est. Delivery:** {shipment['estimated']}" ] if shipment['url']: lines.append(f"**Track:** <{shipment['url']}>") return '\n'.join(lines) def send_discord_notification(content, channel_id): """Send notification to Discord (placeholder - will use message tool or sessions)""" # This will be called via sessions_send or we can write to a file for cron print(f"[NOTIFY] Channel {channel_id}: {content[:100]}...") return True def main(): init_db() conn = sqlite3.connect(DB_PATH) print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] Checking for new shipments...") try: # Connect to IMAP sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(15) sock.connect((IMAP_HOST, IMAP_PORT)) sock.recv(1024) # Login sock.send(f'A1 LOGIN "{USERNAME}" "{PASSWORD}"\r\n'.encode()) get_response(sock, "A1 OK") # Select INBOX sock.send(b'A2 SELECT "INBOX"\r\n') get_response(sock, "A2 OK") # Search unread messages sock.send(b'A3 SEARCH UNSEEN\r\n') resp = get_response(sock, "A3 OK").decode() msg_nums = [] for line in resp.split('\r\n'): if 'SEARCH' in line and '*' in line: parts = line.split('SEARCH') if len(parts) > 1: msg_nums = [n for n in parts[1].strip().split() if n.isdigit()] if not msg_nums: print("[INFO] No unread messages") conn.close() return [] print(f"[INFO] {len(msg_nums)} unread messages to check") shipments = [] for msg_num in msg_nums: shipment = process_message(sock, msg_num, conn) if shipment: shipments.append(shipment) print(f"[FOUND] {shipment['carrier']} - {shipment['tracking']}") # Logout sock.send(b'A5 LOGOUT\r\n') sock.close() # Output results for Discord if shipments: output = {'channel': CHANNEL_ID, 'shipments': shipments} output_path = os.path.expanduser('~/.openclaw/workspace/data/shipping_output.json') with open(output_path, 'w') as f: json.dump(output, f) print(f"[INFO] Found {len(shipments)} shipments, saved to {output_path}") else: print("[INFO] No new shipments found") return shipments except Exception as e: print(f"[ERR] {e}") import traceback traceback.print_exc() return [] finally: conn.close() if __name__ == "__main__": shipments = main() if shipments: for s in shipments: print("\n" + format_notification(s))