264 lines
7.8 KiB
Python
264 lines
7.8 KiB
Python
"""
|
|
Session Monitor
|
|
===============
|
|
Automatically tracks conversation transcripts and captures snapshots.
|
|
Runs via cron every 2 minutes.
|
|
|
|
Reads OpenClaw session transcripts from:
|
|
~/.openclaw/agents/main/sessions/*.jsonl
|
|
|
|
Usage:
|
|
python session_monitor.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from memory_vector import store_memory
|
|
|
|
DB_PATH = os.path.expanduser("~/.openclaw/memory.db")
|
|
SESSIONS_DIR = os.path.expanduser("~/.openclaw/agents/main/sessions")
|
|
SNAPSHOT_THRESHOLD = 15 # Messages between snapshots
|
|
|
|
def get_db():
|
|
"""Get database connection."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def find_session_files():
|
|
"""Find all session transcript files."""
|
|
if not os.path.exists(SESSIONS_DIR):
|
|
return []
|
|
|
|
files = []
|
|
for f in Path(SESSIONS_DIR).glob("*.jsonl"):
|
|
# Extract session ID from filename
|
|
session_id = f.stem # filename without extension
|
|
files.append({
|
|
'session_id': session_id,
|
|
'path': str(f),
|
|
'mtime': os.path.getmtime(f)
|
|
})
|
|
|
|
# Sort by most recent first
|
|
files.sort(key=lambda x: x['mtime'], reverse=True)
|
|
return files
|
|
|
|
def parse_transcript(filepath, start_index=0):
|
|
"""
|
|
Parse transcript file and extract user messages.
|
|
Returns (messages, total_lines).
|
|
"""
|
|
messages = []
|
|
|
|
if not os.path.exists(filepath):
|
|
return messages, 0
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
print(f"[ERROR] Cannot read {filepath}: {e}")
|
|
return messages, 0
|
|
|
|
total_lines = len(lines)
|
|
|
|
# Process lines from start_index onwards
|
|
for i, line in enumerate(lines[start_index:], start=start_index):
|
|
if not line.strip():
|
|
continue
|
|
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Only count user messages
|
|
if entry.get('type') == 'message':
|
|
msg = entry.get('message', {})
|
|
if msg.get('role') == 'user':
|
|
content = msg.get('content', [])
|
|
# Extract text content
|
|
text_parts = []
|
|
for part in content:
|
|
if isinstance(part, dict) and part.get('type') == 'text':
|
|
text_parts.append(part.get('text', ''))
|
|
elif isinstance(part, str):
|
|
text_parts.append(part)
|
|
|
|
if text_parts:
|
|
messages.append({
|
|
'index': i,
|
|
'timestamp': entry.get('timestamp'),
|
|
'text': ' '.join(text_parts)
|
|
})
|
|
|
|
return messages, total_lines
|
|
|
|
def generate_summary(messages):
|
|
"""Generate a summary of messages for embedding."""
|
|
if not messages:
|
|
return None
|
|
|
|
# Simple extraction: join first 1000 chars of message texts
|
|
combined = ' | '.join([m['text'] for m in messages])
|
|
return combined[:1000] if len(combined) > 1000 else combined
|
|
|
|
def process_session(session_info):
|
|
"""
|
|
Process a single session.
|
|
Returns (snapshot_created, message_count).
|
|
"""
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
session_id = session_info['session_id']
|
|
filepath = session_info['path']
|
|
|
|
# Get or create tracking record
|
|
cursor.execute('''
|
|
SELECT * FROM session_tracking WHERE session_id = ?
|
|
''', (session_id,))
|
|
|
|
tracking = cursor.fetchone()
|
|
|
|
if tracking is None:
|
|
# New session
|
|
cursor.execute('''
|
|
INSERT INTO session_tracking
|
|
(session_id, transcript_path, last_message_index, messages_since_snapshot)
|
|
VALUES (?, ?, 0, 0)
|
|
''', (session_id, filepath))
|
|
conn.commit()
|
|
last_index = 0
|
|
since_snapshot = 0
|
|
else:
|
|
last_index = tracking['last_message_index']
|
|
since_snapshot = tracking['messages_since_snapshot']
|
|
|
|
# Parse new messages
|
|
messages, total_lines = parse_transcript(filepath, last_index)
|
|
|
|
if not messages:
|
|
conn.close()
|
|
return False, 0
|
|
|
|
new_count = len(messages)
|
|
since_snapshot += new_count
|
|
|
|
print(f" [{session_id[:8]}...]: {new_count} new messages, {since_snapshot} since snapshot")
|
|
|
|
snapshot_created = False
|
|
|
|
# Check if threshold reached
|
|
if since_snapshot >= SNAPSHOT_THRESHOLD:
|
|
summary = generate_summary(messages[-SNAPSHOT_THRESHOLD:])
|
|
|
|
if summary and len(summary) > 50: # Only snapshot if substantive
|
|
try:
|
|
# Get embedding from Ollama
|
|
import requests
|
|
response = requests.post(
|
|
"http://localhost:11434/api/embeddings",
|
|
json={"model": "nomic-embed-text", "prompt": summary[:2000]},
|
|
timeout=30
|
|
)
|
|
embedding = response.json()["embedding"]
|
|
|
|
# Store in database
|
|
source_path = f"session://{session_id}#{datetime.now().strftime('%H:%M')}"
|
|
store_memory(
|
|
source_type="auto_session",
|
|
source_path=source_path,
|
|
content=summary,
|
|
embedding=embedding
|
|
)
|
|
|
|
print(f" [OK] Snapshot saved: {source_path}")
|
|
since_snapshot = 0
|
|
snapshot_created = True
|
|
|
|
except Exception as e:
|
|
print(f" [ERROR] Failed to create snapshot: {e}")
|
|
else:
|
|
print(f" [SKIP] Content too short for snapshot")
|
|
since_snapshot = 0 # Reset anyway to avoid getting stuck
|
|
|
|
# Update tracking
|
|
cursor.execute('''
|
|
UPDATE session_tracking
|
|
SET last_message_index = ?,
|
|
messages_since_snapshot = ?,
|
|
last_checkpoint_time = CURRENT_TIMESTAMP,
|
|
transcript_path = ?
|
|
WHERE session_id = ?
|
|
''', (total_lines, since_snapshot, filepath, session_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return snapshot_created, new_count
|
|
|
|
def cleanup_old_sessions():
|
|
"""Delete session files older than 24 hours."""
|
|
sessions_dir = Path(SESSIONS_DIR)
|
|
cutoff_time = datetime.now().timestamp() - (24 * 3600) # 24 hours
|
|
|
|
deleted_count = 0
|
|
|
|
for jsonl_file in sessions_dir.glob("*.jsonl"):
|
|
if jsonl_file.stat().st_mtime < cutoff_time:
|
|
try:
|
|
jsonl_file.unlink()
|
|
deleted_count += 1
|
|
print(f" [CLEANUP] Deleted: {jsonl_file.name}")
|
|
except Exception as e:
|
|
print(f" [CLEANUP] Failed to delete {jsonl_file.name}: {e}")
|
|
|
|
return deleted_count
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
print(f"\n{'='*60}")
|
|
print("Session Monitor")
|
|
print(f"{'='*60}")
|
|
print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print()
|
|
|
|
# Find session files
|
|
sessions = find_session_files()
|
|
|
|
if not sessions:
|
|
print("[INFO] No active session files found")
|
|
return
|
|
|
|
print(f"Found {len(sessions)} session file(s)")
|
|
print()
|
|
|
|
total_snapshots = 0
|
|
total_messages = 0
|
|
|
|
for session in sessions:
|
|
snapshot, count = process_session(session)
|
|
total_messages += count
|
|
if snapshot:
|
|
total_snapshots += 1
|
|
|
|
# Cleanup old sessions
|
|
deleted = cleanup_old_sessions()
|
|
|
|
print()
|
|
print(f"{'='*60}")
|
|
print(f"Summary: {total_messages} messages, {total_snapshots} snapshots, {deleted} files cleaned")
|
|
print(f"{'='*60}\n")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|