Files
controls-web/ai_agents/db_agent/extractor/schema_snapshot.py
2026-02-17 09:29:34 -06:00

176 lines
5.8 KiB
Python

"""Extract schema metadata from the SugarScale MSSQL database."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List
import sqlalchemy as sa
from sqlalchemy.engine import Engine
from .settings import DbSettings
@dataclass
class TableColumn:
table_schema: str
table_name: str
column_name: str
data_type: str
is_nullable: bool
column_default: str | None
character_maximum_length: int | None
@dataclass
class ForeignKeyRelation:
source_schema: str
source_table: str
source_column: str
target_schema: str
target_table: str
target_column: str
def create_engine(settings: DbSettings) -> Engine:
connection_uri = (
"mssql+pyodbc:///?odbc_connect=" + sa.engine.url.quote_plus(settings.connection_string())
)
return sa.create_engine(connection_uri, fast_executemany=True)
def fetch_columns(engine: Engine, schema: str | None = None) -> List[TableColumn]:
columns: List[TableColumn] = []
query = sa.text(
"""
SELECT
TABLE_SCHEMA,
TABLE_NAME,
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
COLUMN_DEFAULT,
CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE (:schema IS NULL OR TABLE_SCHEMA = :schema)
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
"""
)
with engine.connect() as conn:
result = conn.execute(query, {"schema": schema})
for row in result:
columns.append(
TableColumn(
table_schema=row.TABLE_SCHEMA,
table_name=row.TABLE_NAME,
column_name=row.COLUMN_NAME,
data_type=row.DATA_TYPE,
is_nullable=(row.IS_NULLABLE == "YES"),
column_default=row.COLUMN_DEFAULT,
character_maximum_length=row.CHARACTER_MAXIMUM_LENGTH,
)
)
return columns
def fetch_foreign_keys(engine: Engine, schema: str | None = None) -> List[ForeignKeyRelation]:
relations: List[ForeignKeyRelation] = []
query = sa.text(
"""
SELECT
fk_tab.name AS source_table,
fk_col.name AS source_column,
fk_sch.name AS source_schema,
pk_tab.name AS target_table,
pk_col.name AS target_column,
pk_sch.name AS target_schema
FROM sys.foreign_key_columns fkc
INNER JOIN sys.tables fk_tab ON fkc.parent_object_id = fk_tab.object_id
INNER JOIN sys.schemas fk_sch ON fk_tab.schema_id = fk_sch.schema_id
INNER JOIN sys.columns fk_col ON fkc.parent_object_id = fk_col.object_id AND fkc.parent_column_id = fk_col.column_id
INNER JOIN sys.tables pk_tab ON fkc.referenced_object_id = pk_tab.object_id
INNER JOIN sys.schemas pk_sch ON pk_tab.schema_id = pk_sch.schema_id
INNER JOIN sys.columns pk_col ON fkc.referenced_object_id = pk_col.object_id AND fkc.referenced_column_id = pk_col.column_id
WHERE (:schema IS NULL OR fk_sch.name = :schema)
ORDER BY fk_sch.name, fk_tab.name
"""
)
with engine.connect() as conn:
result = conn.execute(query, {"schema": schema})
for row in result:
relations.append(
ForeignKeyRelation(
source_schema=row.source_schema,
source_table=row.source_table,
source_column=row.source_column,
target_schema=row.target_schema,
target_table=row.target_table,
target_column=row.target_column,
)
)
return relations
def assemble_schema_document(columns: Iterable[TableColumn], relations: Iterable[ForeignKeyRelation]) -> Dict[str, Any]:
schema_doc: Dict[str, Any] = {"tables": {}}
for col in columns:
table_key = f"{col.table_schema}.{col.table_name}"
table_entry = schema_doc["tables"].setdefault(
table_key,
{
"schema": col.table_schema,
"name": col.table_name,
"columns": [],
"relationships": [],
},
)
table_entry["columns"].append(
{
"name": col.column_name,
"type": col.data_type,
"nullable": col.is_nullable,
"default": col.column_default,
"max_length": col.character_maximum_length,
}
)
for rel in relations:
table_key = f"{rel.source_schema}.{rel.source_table}"
table_entry = schema_doc["tables"].setdefault(
table_key,
{
"schema": rel.source_schema,
"name": rel.source_table,
"columns": [],
"relationships": [],
},
)
table_entry["relationships"].append(
{
"target": f"{rel.target_schema}.{rel.target_table}",
"source_column": rel.source_column,
"target_column": rel.target_column,
}
)
return schema_doc
def write_schema_json(output_path: Path, schema_doc: Dict[str, Any]) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(schema_doc, f, indent=2, sort_keys=True)
def run(schema: str | None = None, output: Path | None = None) -> Path:
settings = DbSettings.from_env()
engine = create_engine(settings)
columns = fetch_columns(engine, schema=schema)
relations = fetch_foreign_keys(engine, schema=schema)
schema_doc = assemble_schema_document(columns, relations)
output_path = output or Path("db_agent/context/schema.json")
write_schema_json(output_path, schema_doc)
return output_path