Files
controls-web/controls-rework/correlation_analysis.py
2026-02-17 09:29:34 -06:00

594 lines
20 KiB
Python

from flask import Flask, render_template, request, jsonify, send_file
import pandas as pd
import numpy as np
import mysql.connector
from scipy import stats
from scipy.stats import pearsonr, spearmanr, kendalltau
import plotly.graph_objects as go
import plotly.express as px
import plotly.utils
import json
from datetime import datetime, timedelta
import io
import base64
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
app = Flask(__name__)
# Database configuration
DB_CONFIG = {
'host': '192.168.0.13',
'user': 'opce',
'password': 'opcelasuca',
'database': 'archive'
}
class CorrelationAnalyzer:
def __init__(self):
self.connection = None
def connect_db(self):
"""Connect to database"""
try:
self.connection = mysql.connector.connect(**DB_CONFIG)
return True
except Exception as e:
print(f"Database connection error: {e}")
return False
def get_available_tags(self, min_records=100, days_back=30):
"""Get available tags with sufficient data"""
if not self.connect_db():
return []
query = """
SELECT DISTINCT n.name,
COUNT(*) as record_count,
MIN(h.TimeStamp) as earliest_date,
MAX(h.TimeStamp) as latest_date,
AVG(h.Value) as avg_value,
STDDEV(h.Value) as std_dev
FROM id_names n
INNER JOIN historicaldata h ON n.idnumber = h.ID
WHERE n.name IS NOT NULL
AND n.name != ''
AND h.Value IS NOT NULL
AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY n.name
HAVING record_count >= %s
ORDER BY record_count DESC, n.name ASC
"""
try:
cursor = self.connection.cursor(dictionary=True)
cursor.execute(query, (days_back, min_records))
tags = cursor.fetchall()
cursor.close()
self.connection.close()
return tags
except Exception as e:
print(f"Error fetching tags: {e}")
return []
def get_synchronized_data(self, tags, hours_back=24, interval_minutes=10):
"""Get synchronized data for correlation analysis"""
if not self.connect_db():
return pd.DataFrame()
# Create time buckets for synchronization
placeholders = ','.join(['%s'] * len(tags))
query = f"""
SELECT
FLOOR(UNIX_TIMESTAMP(h.TimeStamp) / (%s * 60)) * (%s * 60) as time_bucket,
n.name as tag_name,
AVG(h.Value) as avg_value,
COUNT(*) as point_count
FROM historicaldata h
INNER JOIN id_names n ON h.ID = n.idnumber
WHERE n.name IN ({placeholders})
AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
AND h.Value IS NOT NULL
GROUP BY time_bucket, n.name
HAVING point_count >= 1
ORDER BY time_bucket ASC
"""
try:
cursor = self.connection.cursor(dictionary=True)
params = [interval_minutes, interval_minutes] + tags + [hours_back]
cursor.execute(query, params)
raw_data = cursor.fetchall()
cursor.close()
self.connection.close()
# Convert to DataFrame
df = pd.DataFrame(raw_data)
if df.empty:
return pd.DataFrame()
# Pivot to get tags as columns
pivot_df = df.pivot_table(
index='time_bucket',
columns='tag_name',
values='avg_value',
aggfunc='mean'
)
# Only keep rows with data for all tags
complete_data = pivot_df.dropna()
# Add timestamp column
complete_data['timestamp'] = pd.to_datetime(complete_data.index, unit='s')
return complete_data.reset_index(drop=True)
except Exception as e:
print(f"Error fetching synchronized data: {e}")
return pd.DataFrame()
def calculate_correlation_matrix(self, df, method='pearson', tags=None):
"""Calculate correlation matrix with different methods"""
if df.empty or tags is None:
return pd.DataFrame(), pd.DataFrame()
# Select only the tag columns
data_cols = [col for col in tags if col in df.columns]
if not data_cols:
return pd.DataFrame(), pd.DataFrame()
data = df[data_cols]
# Calculate correlation matrix
if method == 'pearson':
corr_matrix = data.corr(method='pearson')
elif method == 'spearman':
corr_matrix = data.corr(method='spearman')
elif method == 'kendall':
corr_matrix = data.corr(method='kendall')
else:
corr_matrix = data.corr(method='pearson')
# Calculate p-values
p_values = pd.DataFrame(index=corr_matrix.index, columns=corr_matrix.columns)
for i, col1 in enumerate(corr_matrix.columns):
for j, col2 in enumerate(corr_matrix.columns):
if i == j:
p_values.loc[col1, col2] = 0.0
else:
if method == 'spearman':
corr, p_val = spearmanr(data[col1].dropna(), data[col2].dropna())
elif method == 'kendall':
corr, p_val = kendalltau(data[col1].dropna(), data[col2].dropna())
else:
corr, p_val = pearsonr(data[col1].dropna(), data[col2].dropna())
p_values.loc[col1, col2] = p_val
return corr_matrix, p_values
def calculate_lag_correlations(self, df, tags, max_lag=6):
"""Calculate time-lagged correlations"""
if df.empty or len(tags) < 2:
return []
lag_results = []
for i, tag1 in enumerate(tags):
for j, tag2 in enumerate(tags):
if i >= j or tag1 not in df.columns or tag2 not in df.columns:
continue
series1 = df[tag1].dropna()
series2 = df[tag2].dropna()
if len(series1) < 10 or len(series2) < 10:
continue
best_corr = 0
best_lag = 0
for lag in range(max_lag + 1):
if lag == 0:
# No lag
min_len = min(len(series1), len(series2))
s1 = series1[:min_len]
s2 = series2[:min_len]
else:
# Apply lag
if len(series1) <= lag or len(series2) <= lag:
continue
s1 = series1[lag:]
s2 = series2[:-lag]
if len(s1) < 10:
continue
try:
corr, _ = pearsonr(s1, s2)
if abs(corr) > abs(best_corr):
best_corr = corr
best_lag = lag
except:
continue
if abs(best_corr) > 0.3: # Only include meaningful correlations
lag_results.append({
'tag_x': tag1,
'tag_y': tag2,
'correlation': best_corr,
'lag': best_lag,
'lag_minutes': best_lag * 10 # Assuming 10-minute intervals
})
# Sort by correlation strength
lag_results.sort(key=lambda x: abs(x['correlation']), reverse=True)
return lag_results[:20] # Return top 20
def perform_pca_analysis(self, df, tags):
"""Perform Principal Component Analysis"""
if df.empty or len(tags) < 3:
return None
# Prepare data
data_cols = [col for col in tags if col in df.columns]
data = df[data_cols].dropna()
if data.empty or len(data) < 5:
return None
# Standardize data
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
# Perform PCA
pca = PCA()
pca_result = pca.fit_transform(scaled_data)
# Get explained variance
explained_variance = pca.explained_variance_ratio_
# Get component loadings
components = pd.DataFrame(
pca.components_.T,
columns=[f'PC{i+1}' for i in range(len(pca.components_))],
index=data.columns
)
return {
'explained_variance': explained_variance.tolist(),
'components': components.to_dict(),
'cumulative_variance': np.cumsum(explained_variance).tolist(),
'n_components': len(explained_variance)
}
def generate_insights(self, corr_matrix, p_values, lag_correlations=None):
"""Generate insights from correlation analysis"""
if corr_matrix.empty:
return {}
insights = {
'strong_positive': [],
'strong_negative': [],
'moderate_correlations': [],
'weak_correlations': [],
'significant_lags': [],
'summary': {}
}
# Analyze correlations
strong_pos_count = 0
strong_neg_count = 0
all_correlations = []
for i, col1 in enumerate(corr_matrix.columns):
for j, col2 in enumerate(corr_matrix.columns):
if i >= j: # Avoid duplicates and self-correlation
continue
corr = corr_matrix.loc[col1, col2]
p_val = p_values.loc[col1, col2] if not p_values.empty else 1.0
all_correlations.append(abs(corr))
if abs(corr) >= 0.7 and p_val < 0.05:
insight = {
'tag_pair': [col1, col2],
'correlation': corr,
'p_value': p_val,
'strength': 'Strong'
}
if corr > 0:
insights['strong_positive'].append(insight)
strong_pos_count += 1
else:
insights['strong_negative'].append(insight)
strong_neg_count += 1
elif 0.3 <= abs(corr) < 0.7:
insights['moderate_correlations'].append({
'tag_pair': [col1, col2],
'correlation': corr,
'p_value': p_val,
'strength': 'Moderate'
})
elif abs(corr) < 0.3:
insights['weak_correlations'].append({
'tag_pair': [col1, col2],
'correlation': corr,
'p_value': p_val,
'strength': 'Weak'
})
# Add lag correlations
if lag_correlations:
insights['significant_lags'] = [
lag for lag in lag_correlations
if abs(lag['correlation']) > 0.5 and lag['lag'] > 0
][:10]
# Summary statistics
insights['summary'] = {
'total_pairs': len(all_correlations),
'strong_positive_count': strong_pos_count,
'strong_negative_count': strong_neg_count,
'average_correlation': np.mean(all_correlations) if all_correlations else 0,
'max_correlation': max(all_correlations) if all_correlations else 0,
'highly_correlated_pairs': strong_pos_count + strong_neg_count
}
return insights
# Initialize analyzer
analyzer = CorrelationAnalyzer()
@app.route('/')
def dashboard():
"""Main dashboard"""
return render_template('correlation_dashboard.html')
@app.route('/api/get_tags')
def get_tags():
"""API endpoint to get available tags"""
try:
tags = analyzer.get_available_tags()
return jsonify({
'success': True,
'tags': tags,
'total_count': len(tags)
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/analyze', methods=['POST'])
def analyze_correlations():
"""API endpoint for correlation analysis"""
try:
data = request.json
selected_tags = data.get('tags', [])
time_range = int(data.get('time_range', 24))
sampling_interval = int(data.get('sampling_interval', 10))
correlation_method = data.get('correlation_method', 'pearson')
lag_analysis = data.get('lag_analysis', False)
max_lag = int(data.get('max_lag', 6))
include_pca = data.get('include_pca', False)
if len(selected_tags) < 2:
raise ValueError("Please select at least 2 variables")
if len(selected_tags) > 25:
raise ValueError("Maximum 25 variables allowed")
# Get synchronized data
df = analyzer.get_synchronized_data(selected_tags, time_range, sampling_interval)
if df.empty:
raise ValueError("No synchronized data found for selected variables")
# Calculate correlation matrix
corr_matrix, p_values = analyzer.calculate_correlation_matrix(
df, correlation_method, selected_tags
)
# Calculate lag correlations if requested
lag_correlations = []
if lag_analysis:
lag_correlations = analyzer.calculate_lag_correlations(df, selected_tags, max_lag)
# Perform PCA if requested
pca_results = None
if include_pca and len(selected_tags) >= 3:
pca_results = analyzer.perform_pca_analysis(df, selected_tags)
# Generate insights
insights = analyzer.generate_insights(corr_matrix, p_values, lag_correlations)
# Create correlation heatmap
heatmap_data = create_correlation_heatmap(corr_matrix)
return jsonify({
'success': True,
'correlation_matrix': corr_matrix.to_dict() if not corr_matrix.empty else {},
'p_values': p_values.to_dict() if not p_values.empty else {},
'lag_correlations': lag_correlations,
'insights': insights,
'pca_results': pca_results,
'heatmap_data': heatmap_data,
'data_points': len(df),
'tags': selected_tags,
'analysis_params': {
'time_range': time_range,
'method': correlation_method,
'sampling_interval': sampling_interval
}
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/scatter_plot', methods=['POST'])
def get_scatter_plot():
"""API endpoint for scatter plot data"""
try:
data = request.json
tag_x = data.get('tag_x')
tag_y = data.get('tag_y')
time_range = int(data.get('time_range', 24))
if not tag_x or not tag_y:
raise ValueError("Both X and Y variables must be specified")
# Get data for the two tags
df = analyzer.get_synchronized_data([tag_x, tag_y], time_range, 5)
if df.empty or tag_x not in df.columns or tag_y not in df.columns:
raise ValueError("No data found for the selected variables")
# Create scatter plot
scatter_data = create_scatter_plot(df, tag_x, tag_y)
return jsonify({
'success': True,
'scatter_data': scatter_data,
'tag_x': tag_x,
'tag_y': tag_y
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/export', methods=['POST'])
def export_data():
"""API endpoint for data export"""
try:
data = request.json
selected_tags = data.get('tags', [])
time_range = int(data.get('time_range', 24))
export_format = data.get('format', 'csv')
if not selected_tags:
raise ValueError("No variables selected for export")
# Get data
df = analyzer.get_synchronized_data(selected_tags, time_range, 5)
if df.empty:
raise ValueError("No data to export")
# Prepare export data
export_df = df[selected_tags + ['timestamp']].copy()
export_df['timestamp'] = pd.to_datetime(export_df['timestamp'])
if export_format == 'csv':
# Create CSV
output = io.StringIO()
export_df.to_csv(output, index=False)
return jsonify({
'success': True,
'data': output.getvalue(),
'filename': f'correlation_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
})
else:
raise ValueError("Unsupported export format")
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
def create_correlation_heatmap(corr_matrix):
"""Create correlation heatmap using Plotly"""
if corr_matrix.empty:
return None
fig = go.Figure(data=go.Heatmap(
z=corr_matrix.values,
x=corr_matrix.columns.tolist(),
y=corr_matrix.index.tolist(),
colorscale='RdBu',
zmid=0,
text=np.round(corr_matrix.values, 3),
texttemplate='%{text}',
textfont={"size": 10},
hoverongaps=False
))
fig.update_layout(
title='Correlation Matrix Heatmap',
xaxis_title='Variables',
yaxis_title='Variables',
width=800,
height=600,
font=dict(size=12)
)
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
def create_scatter_plot(df, tag_x, tag_y):
"""Create scatter plot using Plotly"""
fig = go.Figure()
# Add scatter points
fig.add_trace(go.Scatter(
x=df[tag_x],
y=df[tag_y],
mode='markers',
marker=dict(
size=6,
color=df.index,
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Time Sequence")
),
text=df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S'),
hovertemplate=f'<b>{tag_x}</b>: %{{x}}<br><b>{tag_y}</b>: %{{y}}<br><b>Time</b>: %{{text}}<extra></extra>'
))
# Add trend line
z = np.polyfit(df[tag_x].dropna(), df[tag_y].dropna(), 1)
p = np.poly1d(z)
x_trend = np.linspace(df[tag_x].min(), df[tag_x].max(), 100)
fig.add_trace(go.Scatter(
x=x_trend,
y=p(x_trend),
mode='lines',
name='Trend Line',
line=dict(color='red', width=2, dash='dash')
))
# Calculate correlation
corr, p_val = pearsonr(df[tag_x].dropna(), df[tag_y].dropna())
fig.update_layout(
title=f'Scatter Plot: {tag_x} vs {tag_y}<br>Correlation: {corr:.3f} (p-value: {p_val:.3f})',
xaxis_title=tag_x,
yaxis_title=tag_y,
width=800,
height=600,
showlegend=True
)
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)