232 lines
7.8 KiB
Python
232 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Shipping Tracker with Discord notifications"""
|
|
|
|
import socket
|
|
import sqlite3
|
|
import re
|
|
import os
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from email import policy
|
|
from email.parser import BytesParser
|
|
from datetime import datetime
|
|
|
|
# Config
|
|
IMAP_HOST = "127.0.0.1"
|
|
IMAP_PORT = 1143
|
|
USERNAME = "alexthenerdyai@proton.me"
|
|
PASSWORD = "8yiNBTJBMc6HyOQjIZKjMw"
|
|
DB_PATH = os.path.expanduser("~/.openclaw/workspace/data/shipping.db")
|
|
CHANNEL_ID = "1473701182076752135"
|
|
|
|
CARRIERS = {
|
|
'ups': {'patterns': [r'1Z[A-Z0-9]{16}'], 'domains': ['ups.com'], 'name': 'UPS'},
|
|
'fedex': {'patterns': [r'\b\d{12,20}\b'], 'domains': ['fedex.com'], 'name': 'FedEx'},
|
|
'usps': {'patterns': [r'\b\d{20,22}\b', r'\b9\d{15,21}\b'], 'domains': ['usps.com'], 'name': 'USPS'},
|
|
'dhl': {'patterns': [r'\b\d{10,11}\b'], 'domains': ['dhl.com'], 'name': 'DHL'},
|
|
'amazon': {'patterns': [r'\b\d{12,14}\b'], 'domains': ['amazon.com'], 'name': 'Amazon'},
|
|
}
|
|
|
|
def init_db():
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS shipments (
|
|
id INTEGER PRIMARY KEY, message_id TEXT UNIQUE, sender TEXT, carrier TEXT,
|
|
tracking_number TEXT, estimated_delivery TEXT, tracking_url TEXT,
|
|
received_at TEXT, notified_at TEXT)''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_response(sock, tag):
|
|
response = b""
|
|
while True:
|
|
chunk = sock.recv(16384)
|
|
if not chunk: break
|
|
response += chunk
|
|
if tag.encode() in response: break
|
|
return response
|
|
|
|
def extract_rfc822(data):
|
|
lines = data.split(b'\r\n')
|
|
email_lines, collect = [], False
|
|
for line in lines:
|
|
if b'FETCH' in line and b'RFC822' in line:
|
|
collect = True
|
|
continue
|
|
if line.startswith((b'A4 ', b'A5 ')) and b'OK' in line:
|
|
break
|
|
if collect:
|
|
email_lines.append(line)
|
|
return b'\r\n'.join(email_lines)
|
|
|
|
def strip_html(html):
|
|
import re
|
|
html = re.sub(r'<(style|script)[^>]*>[^<]*</\1>', ' ', html, flags=re.I|re.S)
|
|
html = re.sub(r'</(p|div|h[1-6]|li|tr|br)>', '\n', html, flags=re.I)
|
|
html = re.sub(r'<[^>]+>', ' ', html)
|
|
return re.sub(r'\s+', ' ', html).strip()
|
|
|
|
def parse_email(msg):
|
|
body = ''
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
payload = part.get_payload(decode=True)
|
|
if not payload: continue
|
|
if ct == 'text/plain':
|
|
body += payload.decode('utf-8', errors='ignore')
|
|
elif ct == 'text/html':
|
|
body += strip_html(payload.decode('utf-8', errors='ignore'))
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
body = payload.decode('utf-8', errors='ignore')
|
|
return body
|
|
|
|
def detect_carrier(from_email, body, subject):
|
|
domain = from_email.split('@')[-1].lower() if '@' in from_email else ''
|
|
text = f"{subject} {body}".lower()
|
|
for cid, info in CARRIERS.items():
|
|
if any(d in domain for d in info['domains']): return cid
|
|
if cid in text or info['name'].lower() in text: return cid
|
|
return None
|
|
|
|
def extract_tracking(body, carrier_id):
|
|
if not carrier_id or carrier_id not in CARRIERS:
|
|
for cid, info in CARRIERS.items():
|
|
for pat in info['patterns']:
|
|
m = re.search(pat, body, re.I)
|
|
if m: return m.group(0), cid
|
|
return None, None
|
|
for pat in CARRIERS[carrier_id]['patterns']:
|
|
m = re.search(pat, body, re.I)
|
|
if m: return m.group(0), carrier_id
|
|
return None, carrier_id
|
|
|
|
def extract_delivery(body):
|
|
patterns = [
|
|
r'(?:arriving|delivery|delivered by|estimated delivery)[\s:]*(?:on|by)?\s*(?:monday|tuesday|wednesday|thursday|friday|saturday|sunday)?[,\s]*([a-z]+)\s+(\d{1,2})[,\s]+(\d{4})',
|
|
r'(?:arrives|delivery on|by)\s+(\w+)\s+(\d{1,2})',
|
|
]
|
|
for pat in patterns:
|
|
m = re.search(pat, body.lower())
|
|
if m: return m.group(0).strip()
|
|
return None
|
|
|
|
def get_tracking_url(carrier, num):
|
|
urls = {
|
|
'ups': f'https://www.ups.com/track?tracknum={num}',
|
|
'fedex': f'https://www.fedex.com/fedextrack/?trknbr={num}',
|
|
'usps': f'https://tools.usps.com/go/TrackConfirmAction?tLabels={num}',
|
|
'dhl': f'https://www.dhl.com/en/express/tracking.html?AWB={num}',
|
|
'amazon': f'https://track.amazon.com/tracking/{num}'
|
|
}
|
|
return urls.get(carrier)
|
|
|
|
def process_message(sock, msg_num, conn):
|
|
sock.send(f'A4 FETCH {msg_num} RFC822\r\n'.encode())
|
|
resp = get_response(sock, "A4 OK")
|
|
email_data = extract_rfc822(resp)
|
|
if not email_data: return None
|
|
|
|
try:
|
|
msg = BytesParser(policy=policy.default).parsebytes(email_data)
|
|
except: return None
|
|
|
|
message_id = msg.get('Message-ID', '').strip('<>')
|
|
if not message_id: return None
|
|
|
|
cursor = conn.execute('SELECT 1 FROM shipments WHERE message_id = ?', (message_id,))
|
|
if cursor.fetchone(): return None
|
|
|
|
sender_email = msg.get('From', '')
|
|
sender_name = sender_email.split('<')[0].strip() if '<' in sender_email else sender_email
|
|
subject = msg.get('Subject', '(No Subject)')
|
|
body = parse_email(msg)
|
|
|
|
carrier_id = detect_carrier(sender_email, body, subject)
|
|
tracking_num, carrier_id = extract_tracking(body, carrier_id)
|
|
if not tracking_num: return None
|
|
|
|
est_delivery = extract_delivery(body)
|
|
tracking_url = get_tracking_url(carrier_id, tracking_num)
|
|
carrier_name = CARRIERS.get(carrier_id, {}).get('name', carrier_id.upper())
|
|
|
|
conn.execute('''INSERT INTO shipments (message_id, sender, carrier, tracking_number,
|
|
estimated_delivery, tracking_url, received_at, notified_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
|
|
(message_id, sender_name, carrier_name, tracking_num, est_delivery,
|
|
tracking_url, datetime.now().isoformat(), datetime.now().isoformat()))
|
|
conn.commit()
|
|
|
|
return {
|
|
'sender': sender_name, 'carrier': carrier_name, 'tracking': tracking_num,
|
|
'estimated': est_delivery or 'Unknown', 'url': tracking_url
|
|
}
|
|
|
|
def format_notification(s):
|
|
lines = [
|
|
f"📦 **New Shipment Detected**",
|
|
f"",
|
|
f"**From:** {s['sender']}",
|
|
f"**Carrier:** {s['carrier']}",
|
|
f"**Tracking:** `{s['tracking']}`",
|
|
f"**Est. Delivery:** {s['estimated']}"
|
|
]
|
|
if s['url']:
|
|
lines.append(f"**Track:** <{s['url']}>")
|
|
return '\n'.join(lines)
|
|
|
|
def main():
|
|
init_db()
|
|
conn = sqlite3.connect(DB_PATH)
|
|
shipments = []
|
|
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(15)
|
|
sock.connect((IMAP_HOST, IMAP_PORT))
|
|
sock.recv(1024)
|
|
|
|
sock.send(f'A1 LOGIN "{USERNAME}" "{PASSWORD}"\r\n'.encode())
|
|
get_response(sock, "A1 OK")
|
|
|
|
sock.send(b'A2 SELECT "INBOX"\r\n')
|
|
get_response(sock, "A2 OK")
|
|
|
|
sock.send(b'A3 SEARCH UNSEEN\r\n')
|
|
resp = get_response(sock, "A3 OK").decode()
|
|
|
|
msg_nums = []
|
|
for line in resp.split('\r\n'):
|
|
if 'SEARCH' in line and '*' in line:
|
|
parts = line.split('SEARCH')
|
|
if len(parts) > 1:
|
|
msg_nums = [n for n in parts[1].strip().split() if n.isdigit()]
|
|
|
|
for msg_num in msg_nums:
|
|
s = process_message(sock, msg_num, conn)
|
|
if s: shipments.append(s)
|
|
|
|
sock.send(b'A5 LOGOUT\r\n')
|
|
sock.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
# Output for Discord
|
|
if shipments:
|
|
for s in shipments:
|
|
print(format_notification(s))
|
|
print("---")
|
|
else:
|
|
print("No new shipments found.")
|
|
|
|
return shipments
|
|
|
|
if __name__ == "__main__":
|
|
main()
|