Files
openclaw-workspace/tools/shipping_tracker_cron.py
2026-04-11 09:45:12 -05:00

232 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""Shipping Tracker with Discord notifications"""
import socket
import sqlite3
import re
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from email import policy
from email.parser import BytesParser
from datetime import datetime
# Config
IMAP_HOST = "127.0.0.1"
IMAP_PORT = 1143
USERNAME = "alexthenerdyai@proton.me"
PASSWORD = "8yiNBTJBMc6HyOQjIZKjMw"
DB_PATH = os.path.expanduser("~/.openclaw/workspace/data/shipping.db")
CHANNEL_ID = "1473701182076752135"
CARRIERS = {
'ups': {'patterns': [r'1Z[A-Z0-9]{16}'], 'domains': ['ups.com'], 'name': 'UPS'},
'fedex': {'patterns': [r'\b\d{12,20}\b'], 'domains': ['fedex.com'], 'name': 'FedEx'},
'usps': {'patterns': [r'\b\d{20,22}\b', r'\b9\d{15,21}\b'], 'domains': ['usps.com'], 'name': 'USPS'},
'dhl': {'patterns': [r'\b\d{10,11}\b'], 'domains': ['dhl.com'], 'name': 'DHL'},
'amazon': {'patterns': [r'\b\d{12,14}\b'], 'domains': ['amazon.com'], 'name': 'Amazon'},
}
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.execute('''CREATE TABLE IF NOT EXISTS shipments (
id INTEGER PRIMARY KEY, message_id TEXT UNIQUE, sender TEXT, carrier TEXT,
tracking_number TEXT, estimated_delivery TEXT, tracking_url TEXT,
received_at TEXT, notified_at TEXT)''')
conn.commit()
conn.close()
def get_response(sock, tag):
response = b""
while True:
chunk = sock.recv(16384)
if not chunk: break
response += chunk
if tag.encode() in response: break
return response
def extract_rfc822(data):
lines = data.split(b'\r\n')
email_lines, collect = [], False
for line in lines:
if b'FETCH' in line and b'RFC822' in line:
collect = True
continue
if line.startswith((b'A4 ', b'A5 ')) and b'OK' in line:
break
if collect:
email_lines.append(line)
return b'\r\n'.join(email_lines)
def strip_html(html):
import re
html = re.sub(r'<(style|script)[^>]*>[^<]*</\1>', ' ', html, flags=re.I|re.S)
html = re.sub(r'</(p|div|h[1-6]|li|tr|br)>', '\n', html, flags=re.I)
html = re.sub(r'<[^>]+>', ' ', html)
return re.sub(r'\s+', ' ', html).strip()
def parse_email(msg):
body = ''
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
payload = part.get_payload(decode=True)
if not payload: continue
if ct == 'text/plain':
body += payload.decode('utf-8', errors='ignore')
elif ct == 'text/html':
body += strip_html(payload.decode('utf-8', errors='ignore'))
else:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
return body
def detect_carrier(from_email, body, subject):
domain = from_email.split('@')[-1].lower() if '@' in from_email else ''
text = f"{subject} {body}".lower()
for cid, info in CARRIERS.items():
if any(d in domain for d in info['domains']): return cid
if cid in text or info['name'].lower() in text: return cid
return None
def extract_tracking(body, carrier_id):
if not carrier_id or carrier_id not in CARRIERS:
for cid, info in CARRIERS.items():
for pat in info['patterns']:
m = re.search(pat, body, re.I)
if m: return m.group(0), cid
return None, None
for pat in CARRIERS[carrier_id]['patterns']:
m = re.search(pat, body, re.I)
if m: return m.group(0), carrier_id
return None, carrier_id
def extract_delivery(body):
patterns = [
r'(?:arriving|delivery|delivered by|estimated delivery)[\s:]*(?:on|by)?\s*(?:monday|tuesday|wednesday|thursday|friday|saturday|sunday)?[,\s]*([a-z]+)\s+(\d{1,2})[,\s]+(\d{4})',
r'(?:arrives|delivery on|by)\s+(\w+)\s+(\d{1,2})',
]
for pat in patterns:
m = re.search(pat, body.lower())
if m: return m.group(0).strip()
return None
def get_tracking_url(carrier, num):
urls = {
'ups': f'https://www.ups.com/track?tracknum={num}',
'fedex': f'https://www.fedex.com/fedextrack/?trknbr={num}',
'usps': f'https://tools.usps.com/go/TrackConfirmAction?tLabels={num}',
'dhl': f'https://www.dhl.com/en/express/tracking.html?AWB={num}',
'amazon': f'https://track.amazon.com/tracking/{num}'
}
return urls.get(carrier)
def process_message(sock, msg_num, conn):
sock.send(f'A4 FETCH {msg_num} RFC822\r\n'.encode())
resp = get_response(sock, "A4 OK")
email_data = extract_rfc822(resp)
if not email_data: return None
try:
msg = BytesParser(policy=policy.default).parsebytes(email_data)
except: return None
message_id = msg.get('Message-ID', '').strip('<>')
if not message_id: return None
cursor = conn.execute('SELECT 1 FROM shipments WHERE message_id = ?', (message_id,))
if cursor.fetchone(): return None
sender_email = msg.get('From', '')
sender_name = sender_email.split('<')[0].strip() if '<' in sender_email else sender_email
subject = msg.get('Subject', '(No Subject)')
body = parse_email(msg)
carrier_id = detect_carrier(sender_email, body, subject)
tracking_num, carrier_id = extract_tracking(body, carrier_id)
if not tracking_num: return None
est_delivery = extract_delivery(body)
tracking_url = get_tracking_url(carrier_id, tracking_num)
carrier_name = CARRIERS.get(carrier_id, {}).get('name', carrier_id.upper())
conn.execute('''INSERT INTO shipments (message_id, sender, carrier, tracking_number,
estimated_delivery, tracking_url, received_at, notified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(message_id, sender_name, carrier_name, tracking_num, est_delivery,
tracking_url, datetime.now().isoformat(), datetime.now().isoformat()))
conn.commit()
return {
'sender': sender_name, 'carrier': carrier_name, 'tracking': tracking_num,
'estimated': est_delivery or 'Unknown', 'url': tracking_url
}
def format_notification(s):
lines = [
f"📦 **New Shipment Detected**",
f"",
f"**From:** {s['sender']}",
f"**Carrier:** {s['carrier']}",
f"**Tracking:** `{s['tracking']}`",
f"**Est. Delivery:** {s['estimated']}"
]
if s['url']:
lines.append(f"**Track:** <{s['url']}>")
return '\n'.join(lines)
def main():
init_db()
conn = sqlite3.connect(DB_PATH)
shipments = []
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(15)
sock.connect((IMAP_HOST, IMAP_PORT))
sock.recv(1024)
sock.send(f'A1 LOGIN "{USERNAME}" "{PASSWORD}"\r\n'.encode())
get_response(sock, "A1 OK")
sock.send(b'A2 SELECT "INBOX"\r\n')
get_response(sock, "A2 OK")
sock.send(b'A3 SEARCH UNSEEN\r\n')
resp = get_response(sock, "A3 OK").decode()
msg_nums = []
for line in resp.split('\r\n'):
if 'SEARCH' in line and '*' in line:
parts = line.split('SEARCH')
if len(parts) > 1:
msg_nums = [n for n in parts[1].strip().split() if n.isdigit()]
for msg_num in msg_nums:
s = process_message(sock, msg_num, conn)
if s: shipments.append(s)
sock.send(b'A5 LOGOUT\r\n')
sock.close()
except Exception as e:
print(f"Error: {e}")
finally:
conn.close()
# Output for Discord
if shipments:
for s in shipments:
print(format_notification(s))
print("---")
else:
print("No new shipments found.")
return shipments
if __name__ == "__main__":
main()