250 lines
7.6 KiB
Python
250 lines
7.6 KiB
Python
"""
|
|
Memory Embedding Worker
|
|
=======================
|
|
Process memory files and store with embeddings in SQLite.
|
|
Runs as cron job or standalone.
|
|
|
|
Usage:
|
|
python memory_embedding_worker.py [--date YYYY-MM-DD]
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
# Add parent dir to path for memory_vector
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import json
|
|
import re
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Tuple, Optional
|
|
from memory_vector import setup_memory_vectors, store_memory, MemoryVectorDB
|
|
|
|
# Ollama configuration
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
EMBED_MODEL = "nomic-embed-text"
|
|
CHUNK_SIZE = 512 # Characters per chunk
|
|
|
|
|
|
def generate_embedding(text: str) -> Optional[List[float]]:
|
|
"""Generate embedding using Ollama."""
|
|
try:
|
|
response = requests.post(
|
|
f"{OLLAMA_URL}/api/embeddings",
|
|
json={
|
|
"model": EMBED_MODEL,
|
|
"prompt": text[:8192] # Truncate if too long
|
|
},
|
|
timeout=60
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()["embedding"]
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to generate embedding: {e}")
|
|
return None
|
|
|
|
|
|
def chunk_text(text: str, max_chars: int = CHUNK_SIZE) -> List[str]:
|
|
"""Split text into chunks for embedding."""
|
|
# Split by paragraphs first
|
|
paragraphs = text.split('\n\n')
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
for para in paragraphs:
|
|
if len(current_chunk) + len(para) + 2 <= max_chars:
|
|
current_chunk += f"\n\n{para}" if current_chunk else para
|
|
else:
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = para
|
|
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
|
|
return chunks if chunks else [text[:max_chars]]
|
|
|
|
|
|
def extract_sections(content: str) -> List[Tuple[str, str]]:
|
|
"""Extract titled sections from markdown content."""
|
|
sections = []
|
|
|
|
# Look for ## headers
|
|
pattern = r'#{2,3}\s+(.+?)\n(.*?)(?=#{2,3}\s+|\Z)'
|
|
matches = re.findall(pattern, content, re.DOTALL)
|
|
|
|
for title, body in matches:
|
|
title = title.strip()
|
|
body = body.strip()
|
|
if body and len(body) > 50: # Skip empty or tiny sections
|
|
sections.append((title, body))
|
|
|
|
# If no sections found, treat whole content as one section
|
|
if not sections and content.strip():
|
|
sections.append(("General", content.strip()))
|
|
|
|
return sections
|
|
|
|
|
|
def process_memory_file(filepath: str, source_type: str) -> int:
|
|
"""
|
|
Process a single memory file and store with embeddings.
|
|
Returns number of entries created.
|
|
|
|
Args:
|
|
filepath: Path to the memory file
|
|
source_type: 'daily', 'memory_md', 'project', etc.
|
|
"""
|
|
if not os.path.exists(filepath):
|
|
print(f"[SKIP] File not found: {filepath}")
|
|
return 0
|
|
|
|
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
|
content = f.read()
|
|
|
|
# Get filename as source_path reference
|
|
source_path = os.path.basename(filepath)
|
|
|
|
# Extract sections
|
|
sections = extract_sections(content)
|
|
entries_created = 0
|
|
|
|
print(f"[PROCESS] {source_path}: {len(sections)} sections found")
|
|
|
|
for section_title, section_content in sections:
|
|
# Create chunks
|
|
chunks = chunk_text(section_content)
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
# Generate embedding
|
|
embedding = generate_embedding(chunk)
|
|
|
|
if embedding:
|
|
# Store in database
|
|
chunk_label = f"{section_title} (chunk {i+1}/{len(chunks)})" if len(chunks) > 1 else section_title
|
|
store_memory(
|
|
source_type=source_type,
|
|
source_path=f"{source_path}#{chunk_label}",
|
|
content=chunk[:500], # Store preview
|
|
embedding=embedding
|
|
)
|
|
entries_created += 1
|
|
print(f" [STORED] Chunk {(i+1)}/{len(chunks)}")
|
|
else:
|
|
print(f" [FAILED] Chunk {(i+1)}/{len(chunks)}")
|
|
|
|
return entries_created
|
|
|
|
|
|
def get_memory_files(date: Optional[str] = None) -> List[Tuple[str, str]]:
|
|
"""
|
|
Get list of memory files to process.
|
|
Returns list of (filepath, source_type) tuples.
|
|
"""
|
|
files = []
|
|
workspace = os.path.expanduser("~/.openclaw/workspace")
|
|
|
|
if date:
|
|
# Specific date
|
|
daily_path = os.path.join(workspace, "memory", f"{date}.md")
|
|
if os.path.exists(daily_path):
|
|
files.append((daily_path, "daily"))
|
|
else:
|
|
# Yesterday's daily note
|
|
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
daily_path = os.path.join(workspace, "memory", f"{yesterday}.md")
|
|
if os.path.exists(daily_path):
|
|
files.append((daily_path, "daily"))
|
|
|
|
# Also check today's file (in case running during day)
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
today_path = os.path.join(workspace, "memory", f"{today}.md")
|
|
if os.path.exists(today_path) and today_path != daily_path:
|
|
files.append((today_path, "daily"))
|
|
|
|
# Always include MEMORY.md (only process if modified recently?)
|
|
memory_md = os.path.join(workspace, "MEMORY.md")
|
|
if os.path.exists(memory_md):
|
|
# Check if modified in last 24h
|
|
mtime = datetime.fromtimestamp(os.path.getmtime(memory_md))
|
|
if datetime.now() - mtime < timedelta(hours=24):
|
|
files.append((memory_md, "memory_md"))
|
|
|
|
return files
|
|
|
|
|
|
def run_daily_sync(date: Optional[str] = None, dry_run: bool = False) -> dict:
|
|
"""
|
|
Run the daily memory embedding sync.
|
|
|
|
Args:
|
|
date: Specific date to process (YYYY-MM-DD), or None for yesterday
|
|
dry_run: If True, don't actually store embeddings
|
|
|
|
Returns:
|
|
Dict with stats
|
|
"""
|
|
print("=" * 50)
|
|
print("Memory Embedding Worker")
|
|
print("=" * 50)
|
|
|
|
# Setup database
|
|
if not dry_run:
|
|
setup_memory_vectors()
|
|
print("[OK] Database ready\n")
|
|
else:
|
|
print("[DRY RUN] No database changes\n")
|
|
|
|
# Get files to process
|
|
files = get_memory_files(date)
|
|
|
|
if not files:
|
|
print("[INFO] No memory files to process")
|
|
return {"files": 0, "entries": 0}
|
|
|
|
print(f"[INFO] Processing {len(files)} file(s):\n")
|
|
for f, t in files:
|
|
print(f" - {f} ({t})")
|
|
print()
|
|
|
|
# Process each file
|
|
total_entries = 0
|
|
stats = {"files": len(files), "entries": 0, "failed": 0}
|
|
|
|
for filepath, source_type in files:
|
|
print(f"\n[FILE] {os.path.basename(filepath)}")
|
|
entries = process_memory_file(filepath, source_type)
|
|
total_entries += entries
|
|
print(f" Created {entries} embedding entries")
|
|
|
|
stats["entries"] = total_entries
|
|
|
|
# Summary
|
|
print("\n" + "=" * 50)
|
|
print("SUMMARY")
|
|
print("=" * 50)
|
|
print(f"Files processed: {stats['files']}")
|
|
print(f"Embedding entries: {stats['entries']}")
|
|
|
|
return stats
|
|
|
|
|
|
def main():
|
|
"""CLI entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Memory Embedding Worker')
|
|
parser.add_argument('--date', help='Process specific date (YYYY-MM-DD)')
|
|
parser.add_argument('--dry-run', action='store_true', help='Test without storing')
|
|
|
|
args = parser.parse_args()
|
|
|
|
stats = run_daily_sync(date=args.date, dry_run=args.dry_run)
|
|
|
|
# Return exit code: 0 if success, 1 if no entries
|
|
return 0 if stats["entries"] > 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|