176 lines
5.8 KiB
Python
176 lines
5.8 KiB
Python
"""Extract schema metadata from the SugarScale MSSQL database."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List
|
|
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.engine import Engine
|
|
|
|
from .settings import DbSettings
|
|
|
|
|
|
@dataclass
|
|
class TableColumn:
|
|
table_schema: str
|
|
table_name: str
|
|
column_name: str
|
|
data_type: str
|
|
is_nullable: bool
|
|
column_default: str | None
|
|
character_maximum_length: int | None
|
|
|
|
|
|
@dataclass
|
|
class ForeignKeyRelation:
|
|
source_schema: str
|
|
source_table: str
|
|
source_column: str
|
|
target_schema: str
|
|
target_table: str
|
|
target_column: str
|
|
|
|
|
|
def create_engine(settings: DbSettings) -> Engine:
|
|
connection_uri = (
|
|
"mssql+pyodbc:///?odbc_connect=" + sa.engine.url.quote_plus(settings.connection_string())
|
|
)
|
|
return sa.create_engine(connection_uri, fast_executemany=True)
|
|
|
|
|
|
def fetch_columns(engine: Engine, schema: str | None = None) -> List[TableColumn]:
|
|
columns: List[TableColumn] = []
|
|
query = sa.text(
|
|
"""
|
|
SELECT
|
|
TABLE_SCHEMA,
|
|
TABLE_NAME,
|
|
COLUMN_NAME,
|
|
DATA_TYPE,
|
|
IS_NULLABLE,
|
|
COLUMN_DEFAULT,
|
|
CHARACTER_MAXIMUM_LENGTH
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE (:schema IS NULL OR TABLE_SCHEMA = :schema)
|
|
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
|
|
"""
|
|
)
|
|
with engine.connect() as conn:
|
|
result = conn.execute(query, {"schema": schema})
|
|
for row in result:
|
|
columns.append(
|
|
TableColumn(
|
|
table_schema=row.TABLE_SCHEMA,
|
|
table_name=row.TABLE_NAME,
|
|
column_name=row.COLUMN_NAME,
|
|
data_type=row.DATA_TYPE,
|
|
is_nullable=(row.IS_NULLABLE == "YES"),
|
|
column_default=row.COLUMN_DEFAULT,
|
|
character_maximum_length=row.CHARACTER_MAXIMUM_LENGTH,
|
|
)
|
|
)
|
|
return columns
|
|
|
|
|
|
def fetch_foreign_keys(engine: Engine, schema: str | None = None) -> List[ForeignKeyRelation]:
|
|
relations: List[ForeignKeyRelation] = []
|
|
query = sa.text(
|
|
"""
|
|
SELECT
|
|
fk_tab.name AS source_table,
|
|
fk_col.name AS source_column,
|
|
fk_sch.name AS source_schema,
|
|
pk_tab.name AS target_table,
|
|
pk_col.name AS target_column,
|
|
pk_sch.name AS target_schema
|
|
FROM sys.foreign_key_columns fkc
|
|
INNER JOIN sys.tables fk_tab ON fkc.parent_object_id = fk_tab.object_id
|
|
INNER JOIN sys.schemas fk_sch ON fk_tab.schema_id = fk_sch.schema_id
|
|
INNER JOIN sys.columns fk_col ON fkc.parent_object_id = fk_col.object_id AND fkc.parent_column_id = fk_col.column_id
|
|
INNER JOIN sys.tables pk_tab ON fkc.referenced_object_id = pk_tab.object_id
|
|
INNER JOIN sys.schemas pk_sch ON pk_tab.schema_id = pk_sch.schema_id
|
|
INNER JOIN sys.columns pk_col ON fkc.referenced_object_id = pk_col.object_id AND fkc.referenced_column_id = pk_col.column_id
|
|
WHERE (:schema IS NULL OR fk_sch.name = :schema)
|
|
ORDER BY fk_sch.name, fk_tab.name
|
|
"""
|
|
)
|
|
with engine.connect() as conn:
|
|
result = conn.execute(query, {"schema": schema})
|
|
for row in result:
|
|
relations.append(
|
|
ForeignKeyRelation(
|
|
source_schema=row.source_schema,
|
|
source_table=row.source_table,
|
|
source_column=row.source_column,
|
|
target_schema=row.target_schema,
|
|
target_table=row.target_table,
|
|
target_column=row.target_column,
|
|
)
|
|
)
|
|
return relations
|
|
|
|
|
|
def assemble_schema_document(columns: Iterable[TableColumn], relations: Iterable[ForeignKeyRelation]) -> Dict[str, Any]:
|
|
schema_doc: Dict[str, Any] = {"tables": {}}
|
|
for col in columns:
|
|
table_key = f"{col.table_schema}.{col.table_name}"
|
|
table_entry = schema_doc["tables"].setdefault(
|
|
table_key,
|
|
{
|
|
"schema": col.table_schema,
|
|
"name": col.table_name,
|
|
"columns": [],
|
|
"relationships": [],
|
|
},
|
|
)
|
|
table_entry["columns"].append(
|
|
{
|
|
"name": col.column_name,
|
|
"type": col.data_type,
|
|
"nullable": col.is_nullable,
|
|
"default": col.column_default,
|
|
"max_length": col.character_maximum_length,
|
|
}
|
|
)
|
|
|
|
for rel in relations:
|
|
table_key = f"{rel.source_schema}.{rel.source_table}"
|
|
table_entry = schema_doc["tables"].setdefault(
|
|
table_key,
|
|
{
|
|
"schema": rel.source_schema,
|
|
"name": rel.source_table,
|
|
"columns": [],
|
|
"relationships": [],
|
|
},
|
|
)
|
|
table_entry["relationships"].append(
|
|
{
|
|
"target": f"{rel.target_schema}.{rel.target_table}",
|
|
"source_column": rel.source_column,
|
|
"target_column": rel.target_column,
|
|
}
|
|
)
|
|
|
|
return schema_doc
|
|
|
|
|
|
def write_schema_json(output_path: Path, schema_doc: Dict[str, Any]) -> None:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with output_path.open("w", encoding="utf-8") as f:
|
|
json.dump(schema_doc, f, indent=2, sort_keys=True)
|
|
|
|
|
|
def run(schema: str | None = None, output: Path | None = None) -> Path:
|
|
settings = DbSettings.from_env()
|
|
engine = create_engine(settings)
|
|
columns = fetch_columns(engine, schema=schema)
|
|
relations = fetch_foreign_keys(engine, schema=schema)
|
|
schema_doc = assemble_schema_document(columns, relations)
|
|
output_path = output or Path("db_agent/context/schema.json")
|
|
write_schema_json(output_path, schema_doc)
|
|
return output_path
|