Files
openclaw-workspace/docker/discord-voice-bot/openclaw_mcp_server.py
2026-04-11 09:45:12 -05:00

134 lines
3.6 KiB
Python

from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from typing import Any
import logging
from loguru import logger
from mcp.server.fastmcp import FastMCP
# Disable loguru logging for stdio transport
logger.remove()
logging.getLogger().setLevel(logging.CRITICAL)
mcp = FastMCP("openclaw")
@mcp.tool()
def read_file(path: str) -> str:
"""Read a file from the OpenClaw workspace."""
try:
file_path = Path(path)
if not file_path.is_absolute():
# Use OpenClaw workspace as base
workspace = Path("C:\\Users\\admin\\.openclaw\\workspace")
file_path = workspace / file_path
if not file_path.exists():
return json.dumps({"error": f"File not found: {path}"})
content = file_path.read_text(encoding="utf-8", errors="replace")
# Limit response size
if len(content) > 50000:
content = content[:50000] + "\n... [truncated]"
return json.dumps({"content": content, "path": str(file_path)})
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def write_file(path: str, content: str) -> str:
"""Write content to a file in the OpenClaw workspace."""
try:
file_path = Path(path)
if not file_path.is_absolute():
workspace = Path("C:\\Users\\admin\\.openclaw\\workspace")
file_path = workspace / file_path
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
return json.dumps({
"status": "written",
"path": str(file_path),
"bytes": len(content.encode("utf-8"))
})
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def exec_command(command: str) -> str:
"""Execute a shell command in the OpenClaw workspace."""
try:
import subprocess
workspace = Path("C:\\Users\\admin\\.openclaw\\workspace")
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30,
cwd=str(workspace)
)
return json.dumps({
"stdout": result.stdout[:10000],
"stderr": result.stderr[:10000],
"exit_code": result.returncode
})
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def list_files(directory: str = ".") -> str:
"""List files in the OpenClaw workspace."""
try:
workspace = Path("C:\\Users\\admin\\.openclaw\\workspace")
target_dir = workspace / directory
if not target_dir.exists():
return json.dumps({"error": f"Directory not found: {directory}"})
files = []
for f in target_dir.iterdir():
if f.is_file():
files.append(str(f.relative_to(workspace)))
return json.dumps({
"directory": directory,
"files": files[:100],
"count": len(files)
})
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def get_status() -> str:
"""Get OpenClaw status and available tools."""
return json.dumps({
"status": "running",
"tools": ["read_file", "write_file", "exec_command", "list_files", "get_status"],
"workspace": "C:\\Users\\admin\\.openclaw\\workspace"
})
def main() -> None:
"""Run the MCP server."""
mcp.run()
if __name__ == "__main__":
main()