""" Memory Embedding Worker ======================= Process memory files and store with embeddings in SQLite. Runs as cron job or standalone. Usage: python memory_embedding_worker.py [--date YYYY-MM-DD] """ import os import sys # Add parent dir to path for memory_vector sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json import re import requests from datetime import datetime, timedelta from typing import List, Tuple, Optional from memory_vector import setup_memory_vectors, store_memory, MemoryVectorDB # Ollama configuration OLLAMA_URL = "http://localhost:11434" EMBED_MODEL = "nomic-embed-text" CHUNK_SIZE = 512 # Characters per chunk def generate_embedding(text: str) -> Optional[List[float]]: """Generate embedding using Ollama.""" try: response = requests.post( f"{OLLAMA_URL}/api/embeddings", json={ "model": EMBED_MODEL, "prompt": text[:8192] # Truncate if too long }, timeout=60 ) response.raise_for_status() return response.json()["embedding"] except Exception as e: print(f"[ERROR] Failed to generate embedding: {e}") return None def chunk_text(text: str, max_chars: int = CHUNK_SIZE) -> List[str]: """Split text into chunks for embedding.""" # Split by paragraphs first paragraphs = text.split('\n\n') chunks = [] current_chunk = "" for para in paragraphs: if len(current_chunk) + len(para) + 2 <= max_chars: current_chunk += f"\n\n{para}" if current_chunk else para else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = para if current_chunk: chunks.append(current_chunk.strip()) return chunks if chunks else [text[:max_chars]] def extract_sections(content: str) -> List[Tuple[str, str]]: """Extract titled sections from markdown content.""" sections = [] # Look for ## headers pattern = r'#{2,3}\s+(.+?)\n(.*?)(?=#{2,3}\s+|\Z)' matches = re.findall(pattern, content, re.DOTALL) for title, body in matches: title = title.strip() body = body.strip() if body and len(body) > 50: # Skip empty or tiny sections sections.append((title, body)) # If no sections found, treat whole content as one section if not sections and content.strip(): sections.append(("General", content.strip())) return sections def process_memory_file(filepath: str, source_type: str) -> int: """ Process a single memory file and store with embeddings. Returns number of entries created. Args: filepath: Path to the memory file source_type: 'daily', 'memory_md', 'project', etc. """ if not os.path.exists(filepath): print(f"[SKIP] File not found: {filepath}") return 0 with open(filepath, 'r', encoding='utf-8', errors='replace') as f: content = f.read() # Get filename as source_path reference source_path = os.path.basename(filepath) # Extract sections sections = extract_sections(content) entries_created = 0 print(f"[PROCESS] {source_path}: {len(sections)} sections found") for section_title, section_content in sections: # Create chunks chunks = chunk_text(section_content) for i, chunk in enumerate(chunks): # Generate embedding embedding = generate_embedding(chunk) if embedding: # Store in database chunk_label = f"{section_title} (chunk {i+1}/{len(chunks)})" if len(chunks) > 1 else section_title store_memory( source_type=source_type, source_path=f"{source_path}#{chunk_label}", content=chunk[:500], # Store preview embedding=embedding ) entries_created += 1 print(f" [STORED] Chunk {(i+1)}/{len(chunks)}") else: print(f" [FAILED] Chunk {(i+1)}/{len(chunks)}") return entries_created def get_memory_files(date: Optional[str] = None) -> List[Tuple[str, str]]: """ Get list of memory files to process. Returns list of (filepath, source_type) tuples. """ files = [] workspace = os.path.expanduser("~/.openclaw/workspace") if date: # Specific date daily_path = os.path.join(workspace, "memory", f"{date}.md") if os.path.exists(daily_path): files.append((daily_path, "daily")) else: # Yesterday's daily note yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") daily_path = os.path.join(workspace, "memory", f"{yesterday}.md") if os.path.exists(daily_path): files.append((daily_path, "daily")) # Also check today's file (in case running during day) today = datetime.now().strftime("%Y-%m-%d") today_path = os.path.join(workspace, "memory", f"{today}.md") if os.path.exists(today_path) and today_path != daily_path: files.append((today_path, "daily")) # Always include MEMORY.md (only process if modified recently?) memory_md = os.path.join(workspace, "MEMORY.md") if os.path.exists(memory_md): # Check if modified in last 24h mtime = datetime.fromtimestamp(os.path.getmtime(memory_md)) if datetime.now() - mtime < timedelta(hours=24): files.append((memory_md, "memory_md")) return files def run_daily_sync(date: Optional[str] = None, dry_run: bool = False) -> dict: """ Run the daily memory embedding sync. Args: date: Specific date to process (YYYY-MM-DD), or None for yesterday dry_run: If True, don't actually store embeddings Returns: Dict with stats """ print("=" * 50) print("Memory Embedding Worker") print("=" * 50) # Setup database if not dry_run: setup_memory_vectors() print("[OK] Database ready\n") else: print("[DRY RUN] No database changes\n") # Get files to process files = get_memory_files(date) if not files: print("[INFO] No memory files to process") return {"files": 0, "entries": 0} print(f"[INFO] Processing {len(files)} file(s):\n") for f, t in files: print(f" - {f} ({t})") print() # Process each file total_entries = 0 stats = {"files": len(files), "entries": 0, "failed": 0} for filepath, source_type in files: print(f"\n[FILE] {os.path.basename(filepath)}") entries = process_memory_file(filepath, source_type) total_entries += entries print(f" Created {entries} embedding entries") stats["entries"] = total_entries # Summary print("\n" + "=" * 50) print("SUMMARY") print("=" * 50) print(f"Files processed: {stats['files']}") print(f"Embedding entries: {stats['entries']}") return stats def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser(description='Memory Embedding Worker') parser.add_argument('--date', help='Process specific date (YYYY-MM-DD)') parser.add_argument('--dry-run', action='store_true', help='Test without storing') args = parser.parse_args() stats = run_daily_sync(date=args.date, dry_run=args.dry_run) # Return exit code: 0 if success, 1 if no entries return 0 if stats["entries"] > 0 else 1 if __name__ == "__main__": sys.exit(main())