594 lines
20 KiB
Python
594 lines
20 KiB
Python
from flask import Flask, render_template, request, jsonify, send_file
|
|
import pandas as pd
|
|
import numpy as np
|
|
import mysql.connector
|
|
from scipy import stats
|
|
from scipy.stats import pearsonr, spearmanr, kendalltau
|
|
import plotly.graph_objects as go
|
|
import plotly.express as px
|
|
import plotly.utils
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import io
|
|
import base64
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.decomposition import PCA
|
|
import seaborn as sns
|
|
import matplotlib.pyplot as plt
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Database configuration
|
|
DB_CONFIG = {
|
|
'host': '192.168.0.13',
|
|
'user': 'opce',
|
|
'password': 'opcelasuca',
|
|
'database': 'archive'
|
|
}
|
|
|
|
class CorrelationAnalyzer:
|
|
def __init__(self):
|
|
self.connection = None
|
|
|
|
def connect_db(self):
|
|
"""Connect to database"""
|
|
try:
|
|
self.connection = mysql.connector.connect(**DB_CONFIG)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Database connection error: {e}")
|
|
return False
|
|
|
|
def get_available_tags(self, min_records=100, days_back=30):
|
|
"""Get available tags with sufficient data"""
|
|
if not self.connect_db():
|
|
return []
|
|
|
|
query = """
|
|
SELECT DISTINCT n.name,
|
|
COUNT(*) as record_count,
|
|
MIN(h.TimeStamp) as earliest_date,
|
|
MAX(h.TimeStamp) as latest_date,
|
|
AVG(h.Value) as avg_value,
|
|
STDDEV(h.Value) as std_dev
|
|
FROM id_names n
|
|
INNER JOIN historicaldata h ON n.idnumber = h.ID
|
|
WHERE n.name IS NOT NULL
|
|
AND n.name != ''
|
|
AND h.Value IS NOT NULL
|
|
AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
|
GROUP BY n.name
|
|
HAVING record_count >= %s
|
|
ORDER BY record_count DESC, n.name ASC
|
|
"""
|
|
|
|
try:
|
|
cursor = self.connection.cursor(dictionary=True)
|
|
cursor.execute(query, (days_back, min_records))
|
|
tags = cursor.fetchall()
|
|
cursor.close()
|
|
self.connection.close()
|
|
return tags
|
|
except Exception as e:
|
|
print(f"Error fetching tags: {e}")
|
|
return []
|
|
|
|
def get_synchronized_data(self, tags, hours_back=24, interval_minutes=10):
|
|
"""Get synchronized data for correlation analysis"""
|
|
if not self.connect_db():
|
|
return pd.DataFrame()
|
|
|
|
# Create time buckets for synchronization
|
|
placeholders = ','.join(['%s'] * len(tags))
|
|
|
|
query = f"""
|
|
SELECT
|
|
FLOOR(UNIX_TIMESTAMP(h.TimeStamp) / (%s * 60)) * (%s * 60) as time_bucket,
|
|
n.name as tag_name,
|
|
AVG(h.Value) as avg_value,
|
|
COUNT(*) as point_count
|
|
FROM historicaldata h
|
|
INNER JOIN id_names n ON h.ID = n.idnumber
|
|
WHERE n.name IN ({placeholders})
|
|
AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
|
|
AND h.Value IS NOT NULL
|
|
GROUP BY time_bucket, n.name
|
|
HAVING point_count >= 1
|
|
ORDER BY time_bucket ASC
|
|
"""
|
|
|
|
try:
|
|
cursor = self.connection.cursor(dictionary=True)
|
|
params = [interval_minutes, interval_minutes] + tags + [hours_back]
|
|
cursor.execute(query, params)
|
|
raw_data = cursor.fetchall()
|
|
cursor.close()
|
|
self.connection.close()
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(raw_data)
|
|
if df.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Pivot to get tags as columns
|
|
pivot_df = df.pivot_table(
|
|
index='time_bucket',
|
|
columns='tag_name',
|
|
values='avg_value',
|
|
aggfunc='mean'
|
|
)
|
|
|
|
# Only keep rows with data for all tags
|
|
complete_data = pivot_df.dropna()
|
|
|
|
# Add timestamp column
|
|
complete_data['timestamp'] = pd.to_datetime(complete_data.index, unit='s')
|
|
|
|
return complete_data.reset_index(drop=True)
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching synchronized data: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def calculate_correlation_matrix(self, df, method='pearson', tags=None):
|
|
"""Calculate correlation matrix with different methods"""
|
|
if df.empty or tags is None:
|
|
return pd.DataFrame(), pd.DataFrame()
|
|
|
|
# Select only the tag columns
|
|
data_cols = [col for col in tags if col in df.columns]
|
|
if not data_cols:
|
|
return pd.DataFrame(), pd.DataFrame()
|
|
|
|
data = df[data_cols]
|
|
|
|
# Calculate correlation matrix
|
|
if method == 'pearson':
|
|
corr_matrix = data.corr(method='pearson')
|
|
elif method == 'spearman':
|
|
corr_matrix = data.corr(method='spearman')
|
|
elif method == 'kendall':
|
|
corr_matrix = data.corr(method='kendall')
|
|
else:
|
|
corr_matrix = data.corr(method='pearson')
|
|
|
|
# Calculate p-values
|
|
p_values = pd.DataFrame(index=corr_matrix.index, columns=corr_matrix.columns)
|
|
|
|
for i, col1 in enumerate(corr_matrix.columns):
|
|
for j, col2 in enumerate(corr_matrix.columns):
|
|
if i == j:
|
|
p_values.loc[col1, col2] = 0.0
|
|
else:
|
|
if method == 'spearman':
|
|
corr, p_val = spearmanr(data[col1].dropna(), data[col2].dropna())
|
|
elif method == 'kendall':
|
|
corr, p_val = kendalltau(data[col1].dropna(), data[col2].dropna())
|
|
else:
|
|
corr, p_val = pearsonr(data[col1].dropna(), data[col2].dropna())
|
|
|
|
p_values.loc[col1, col2] = p_val
|
|
|
|
return corr_matrix, p_values
|
|
|
|
def calculate_lag_correlations(self, df, tags, max_lag=6):
|
|
"""Calculate time-lagged correlations"""
|
|
if df.empty or len(tags) < 2:
|
|
return []
|
|
|
|
lag_results = []
|
|
|
|
for i, tag1 in enumerate(tags):
|
|
for j, tag2 in enumerate(tags):
|
|
if i >= j or tag1 not in df.columns or tag2 not in df.columns:
|
|
continue
|
|
|
|
series1 = df[tag1].dropna()
|
|
series2 = df[tag2].dropna()
|
|
|
|
if len(series1) < 10 or len(series2) < 10:
|
|
continue
|
|
|
|
best_corr = 0
|
|
best_lag = 0
|
|
|
|
for lag in range(max_lag + 1):
|
|
if lag == 0:
|
|
# No lag
|
|
min_len = min(len(series1), len(series2))
|
|
s1 = series1[:min_len]
|
|
s2 = series2[:min_len]
|
|
else:
|
|
# Apply lag
|
|
if len(series1) <= lag or len(series2) <= lag:
|
|
continue
|
|
s1 = series1[lag:]
|
|
s2 = series2[:-lag]
|
|
|
|
if len(s1) < 10:
|
|
continue
|
|
|
|
try:
|
|
corr, _ = pearsonr(s1, s2)
|
|
if abs(corr) > abs(best_corr):
|
|
best_corr = corr
|
|
best_lag = lag
|
|
except:
|
|
continue
|
|
|
|
if abs(best_corr) > 0.3: # Only include meaningful correlations
|
|
lag_results.append({
|
|
'tag_x': tag1,
|
|
'tag_y': tag2,
|
|
'correlation': best_corr,
|
|
'lag': best_lag,
|
|
'lag_minutes': best_lag * 10 # Assuming 10-minute intervals
|
|
})
|
|
|
|
# Sort by correlation strength
|
|
lag_results.sort(key=lambda x: abs(x['correlation']), reverse=True)
|
|
return lag_results[:20] # Return top 20
|
|
|
|
def perform_pca_analysis(self, df, tags):
|
|
"""Perform Principal Component Analysis"""
|
|
if df.empty or len(tags) < 3:
|
|
return None
|
|
|
|
# Prepare data
|
|
data_cols = [col for col in tags if col in df.columns]
|
|
data = df[data_cols].dropna()
|
|
|
|
if data.empty or len(data) < 5:
|
|
return None
|
|
|
|
# Standardize data
|
|
scaler = StandardScaler()
|
|
scaled_data = scaler.fit_transform(data)
|
|
|
|
# Perform PCA
|
|
pca = PCA()
|
|
pca_result = pca.fit_transform(scaled_data)
|
|
|
|
# Get explained variance
|
|
explained_variance = pca.explained_variance_ratio_
|
|
|
|
# Get component loadings
|
|
components = pd.DataFrame(
|
|
pca.components_.T,
|
|
columns=[f'PC{i+1}' for i in range(len(pca.components_))],
|
|
index=data.columns
|
|
)
|
|
|
|
return {
|
|
'explained_variance': explained_variance.tolist(),
|
|
'components': components.to_dict(),
|
|
'cumulative_variance': np.cumsum(explained_variance).tolist(),
|
|
'n_components': len(explained_variance)
|
|
}
|
|
|
|
def generate_insights(self, corr_matrix, p_values, lag_correlations=None):
|
|
"""Generate insights from correlation analysis"""
|
|
if corr_matrix.empty:
|
|
return {}
|
|
|
|
insights = {
|
|
'strong_positive': [],
|
|
'strong_negative': [],
|
|
'moderate_correlations': [],
|
|
'weak_correlations': [],
|
|
'significant_lags': [],
|
|
'summary': {}
|
|
}
|
|
|
|
# Analyze correlations
|
|
strong_pos_count = 0
|
|
strong_neg_count = 0
|
|
all_correlations = []
|
|
|
|
for i, col1 in enumerate(corr_matrix.columns):
|
|
for j, col2 in enumerate(corr_matrix.columns):
|
|
if i >= j: # Avoid duplicates and self-correlation
|
|
continue
|
|
|
|
corr = corr_matrix.loc[col1, col2]
|
|
p_val = p_values.loc[col1, col2] if not p_values.empty else 1.0
|
|
|
|
all_correlations.append(abs(corr))
|
|
|
|
if abs(corr) >= 0.7 and p_val < 0.05:
|
|
insight = {
|
|
'tag_pair': [col1, col2],
|
|
'correlation': corr,
|
|
'p_value': p_val,
|
|
'strength': 'Strong'
|
|
}
|
|
|
|
if corr > 0:
|
|
insights['strong_positive'].append(insight)
|
|
strong_pos_count += 1
|
|
else:
|
|
insights['strong_negative'].append(insight)
|
|
strong_neg_count += 1
|
|
|
|
elif 0.3 <= abs(corr) < 0.7:
|
|
insights['moderate_correlations'].append({
|
|
'tag_pair': [col1, col2],
|
|
'correlation': corr,
|
|
'p_value': p_val,
|
|
'strength': 'Moderate'
|
|
})
|
|
elif abs(corr) < 0.3:
|
|
insights['weak_correlations'].append({
|
|
'tag_pair': [col1, col2],
|
|
'correlation': corr,
|
|
'p_value': p_val,
|
|
'strength': 'Weak'
|
|
})
|
|
|
|
# Add lag correlations
|
|
if lag_correlations:
|
|
insights['significant_lags'] = [
|
|
lag for lag in lag_correlations
|
|
if abs(lag['correlation']) > 0.5 and lag['lag'] > 0
|
|
][:10]
|
|
|
|
# Summary statistics
|
|
insights['summary'] = {
|
|
'total_pairs': len(all_correlations),
|
|
'strong_positive_count': strong_pos_count,
|
|
'strong_negative_count': strong_neg_count,
|
|
'average_correlation': np.mean(all_correlations) if all_correlations else 0,
|
|
'max_correlation': max(all_correlations) if all_correlations else 0,
|
|
'highly_correlated_pairs': strong_pos_count + strong_neg_count
|
|
}
|
|
|
|
return insights
|
|
|
|
# Initialize analyzer
|
|
analyzer = CorrelationAnalyzer()
|
|
|
|
@app.route('/')
|
|
def dashboard():
|
|
"""Main dashboard"""
|
|
return render_template('correlation_dashboard.html')
|
|
|
|
@app.route('/api/get_tags')
|
|
def get_tags():
|
|
"""API endpoint to get available tags"""
|
|
try:
|
|
tags = analyzer.get_available_tags()
|
|
return jsonify({
|
|
'success': True,
|
|
'tags': tags,
|
|
'total_count': len(tags)
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
})
|
|
|
|
@app.route('/api/analyze', methods=['POST'])
|
|
def analyze_correlations():
|
|
"""API endpoint for correlation analysis"""
|
|
try:
|
|
data = request.json
|
|
selected_tags = data.get('tags', [])
|
|
time_range = int(data.get('time_range', 24))
|
|
sampling_interval = int(data.get('sampling_interval', 10))
|
|
correlation_method = data.get('correlation_method', 'pearson')
|
|
lag_analysis = data.get('lag_analysis', False)
|
|
max_lag = int(data.get('max_lag', 6))
|
|
include_pca = data.get('include_pca', False)
|
|
|
|
if len(selected_tags) < 2:
|
|
raise ValueError("Please select at least 2 variables")
|
|
|
|
if len(selected_tags) > 25:
|
|
raise ValueError("Maximum 25 variables allowed")
|
|
|
|
# Get synchronized data
|
|
df = analyzer.get_synchronized_data(selected_tags, time_range, sampling_interval)
|
|
|
|
if df.empty:
|
|
raise ValueError("No synchronized data found for selected variables")
|
|
|
|
# Calculate correlation matrix
|
|
corr_matrix, p_values = analyzer.calculate_correlation_matrix(
|
|
df, correlation_method, selected_tags
|
|
)
|
|
|
|
# Calculate lag correlations if requested
|
|
lag_correlations = []
|
|
if lag_analysis:
|
|
lag_correlations = analyzer.calculate_lag_correlations(df, selected_tags, max_lag)
|
|
|
|
# Perform PCA if requested
|
|
pca_results = None
|
|
if include_pca and len(selected_tags) >= 3:
|
|
pca_results = analyzer.perform_pca_analysis(df, selected_tags)
|
|
|
|
# Generate insights
|
|
insights = analyzer.generate_insights(corr_matrix, p_values, lag_correlations)
|
|
|
|
# Create correlation heatmap
|
|
heatmap_data = create_correlation_heatmap(corr_matrix)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'correlation_matrix': corr_matrix.to_dict() if not corr_matrix.empty else {},
|
|
'p_values': p_values.to_dict() if not p_values.empty else {},
|
|
'lag_correlations': lag_correlations,
|
|
'insights': insights,
|
|
'pca_results': pca_results,
|
|
'heatmap_data': heatmap_data,
|
|
'data_points': len(df),
|
|
'tags': selected_tags,
|
|
'analysis_params': {
|
|
'time_range': time_range,
|
|
'method': correlation_method,
|
|
'sampling_interval': sampling_interval
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
})
|
|
|
|
@app.route('/api/scatter_plot', methods=['POST'])
|
|
def get_scatter_plot():
|
|
"""API endpoint for scatter plot data"""
|
|
try:
|
|
data = request.json
|
|
tag_x = data.get('tag_x')
|
|
tag_y = data.get('tag_y')
|
|
time_range = int(data.get('time_range', 24))
|
|
|
|
if not tag_x or not tag_y:
|
|
raise ValueError("Both X and Y variables must be specified")
|
|
|
|
# Get data for the two tags
|
|
df = analyzer.get_synchronized_data([tag_x, tag_y], time_range, 5)
|
|
|
|
if df.empty or tag_x not in df.columns or tag_y not in df.columns:
|
|
raise ValueError("No data found for the selected variables")
|
|
|
|
# Create scatter plot
|
|
scatter_data = create_scatter_plot(df, tag_x, tag_y)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'scatter_data': scatter_data,
|
|
'tag_x': tag_x,
|
|
'tag_y': tag_y
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
})
|
|
|
|
@app.route('/api/export', methods=['POST'])
|
|
def export_data():
|
|
"""API endpoint for data export"""
|
|
try:
|
|
data = request.json
|
|
selected_tags = data.get('tags', [])
|
|
time_range = int(data.get('time_range', 24))
|
|
export_format = data.get('format', 'csv')
|
|
|
|
if not selected_tags:
|
|
raise ValueError("No variables selected for export")
|
|
|
|
# Get data
|
|
df = analyzer.get_synchronized_data(selected_tags, time_range, 5)
|
|
|
|
if df.empty:
|
|
raise ValueError("No data to export")
|
|
|
|
# Prepare export data
|
|
export_df = df[selected_tags + ['timestamp']].copy()
|
|
export_df['timestamp'] = pd.to_datetime(export_df['timestamp'])
|
|
|
|
if export_format == 'csv':
|
|
# Create CSV
|
|
output = io.StringIO()
|
|
export_df.to_csv(output, index=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': output.getvalue(),
|
|
'filename': f'correlation_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
|
|
})
|
|
|
|
else:
|
|
raise ValueError("Unsupported export format")
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
})
|
|
|
|
def create_correlation_heatmap(corr_matrix):
|
|
"""Create correlation heatmap using Plotly"""
|
|
if corr_matrix.empty:
|
|
return None
|
|
|
|
fig = go.Figure(data=go.Heatmap(
|
|
z=corr_matrix.values,
|
|
x=corr_matrix.columns.tolist(),
|
|
y=corr_matrix.index.tolist(),
|
|
colorscale='RdBu',
|
|
zmid=0,
|
|
text=np.round(corr_matrix.values, 3),
|
|
texttemplate='%{text}',
|
|
textfont={"size": 10},
|
|
hoverongaps=False
|
|
))
|
|
|
|
fig.update_layout(
|
|
title='Correlation Matrix Heatmap',
|
|
xaxis_title='Variables',
|
|
yaxis_title='Variables',
|
|
width=800,
|
|
height=600,
|
|
font=dict(size=12)
|
|
)
|
|
|
|
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
|
|
|
|
def create_scatter_plot(df, tag_x, tag_y):
|
|
"""Create scatter plot using Plotly"""
|
|
fig = go.Figure()
|
|
|
|
# Add scatter points
|
|
fig.add_trace(go.Scatter(
|
|
x=df[tag_x],
|
|
y=df[tag_y],
|
|
mode='markers',
|
|
marker=dict(
|
|
size=6,
|
|
color=df.index,
|
|
colorscale='Viridis',
|
|
showscale=True,
|
|
colorbar=dict(title="Time Sequence")
|
|
),
|
|
text=df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S'),
|
|
hovertemplate=f'<b>{tag_x}</b>: %{{x}}<br><b>{tag_y}</b>: %{{y}}<br><b>Time</b>: %{{text}}<extra></extra>'
|
|
))
|
|
|
|
# Add trend line
|
|
z = np.polyfit(df[tag_x].dropna(), df[tag_y].dropna(), 1)
|
|
p = np.poly1d(z)
|
|
x_trend = np.linspace(df[tag_x].min(), df[tag_x].max(), 100)
|
|
|
|
fig.add_trace(go.Scatter(
|
|
x=x_trend,
|
|
y=p(x_trend),
|
|
mode='lines',
|
|
name='Trend Line',
|
|
line=dict(color='red', width=2, dash='dash')
|
|
))
|
|
|
|
# Calculate correlation
|
|
corr, p_val = pearsonr(df[tag_x].dropna(), df[tag_y].dropna())
|
|
|
|
fig.update_layout(
|
|
title=f'Scatter Plot: {tag_x} vs {tag_y}<br>Correlation: {corr:.3f} (p-value: {p_val:.3f})',
|
|
xaxis_title=tag_x,
|
|
yaxis_title=tag_y,
|
|
width=800,
|
|
height=600,
|
|
showlegend=True
|
|
)
|
|
|
|
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |