Folder reorganize 1
This commit is contained in:
594
correlation_analysis.py
Normal file
594
correlation_analysis.py
Normal file
@@ -0,0 +1,594 @@
|
||||
from flask import Flask, render_template, request, jsonify, send_file
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import mysql.connector
|
||||
from scipy import stats
|
||||
from scipy.stats import pearsonr, spearmanr, kendalltau
|
||||
import plotly.graph_objects as go
|
||||
import plotly.express as px
|
||||
import plotly.utils
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
import io
|
||||
import base64
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
from sklearn.decomposition import PCA
|
||||
import seaborn as sns
|
||||
import matplotlib.pyplot as plt
|
||||
import warnings
|
||||
warnings.filterwarnings('ignore')
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# Database configuration
|
||||
DB_CONFIG = {
|
||||
'host': '192.168.0.13',
|
||||
'user': 'opce',
|
||||
'password': 'opcelasuca',
|
||||
'database': 'archive'
|
||||
}
|
||||
|
||||
class CorrelationAnalyzer:
|
||||
def __init__(self):
|
||||
self.connection = None
|
||||
|
||||
def connect_db(self):
|
||||
"""Connect to database"""
|
||||
try:
|
||||
self.connection = mysql.connector.connect(**DB_CONFIG)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Database connection error: {e}")
|
||||
return False
|
||||
|
||||
def get_available_tags(self, min_records=100, days_back=30):
|
||||
"""Get available tags with sufficient data"""
|
||||
if not self.connect_db():
|
||||
return []
|
||||
|
||||
query = """
|
||||
SELECT DISTINCT n.name,
|
||||
COUNT(*) as record_count,
|
||||
MIN(h.TimeStamp) as earliest_date,
|
||||
MAX(h.TimeStamp) as latest_date,
|
||||
AVG(h.Value) as avg_value,
|
||||
STDDEV(h.Value) as std_dev
|
||||
FROM id_names n
|
||||
INNER JOIN historicaldata h ON n.idnumber = h.ID
|
||||
WHERE n.name IS NOT NULL
|
||||
AND n.name != ''
|
||||
AND h.Value IS NOT NULL
|
||||
AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s DAY)
|
||||
GROUP BY n.name
|
||||
HAVING record_count >= %s
|
||||
ORDER BY record_count DESC, n.name ASC
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor = self.connection.cursor(dictionary=True)
|
||||
cursor.execute(query, (days_back, min_records))
|
||||
tags = cursor.fetchall()
|
||||
cursor.close()
|
||||
self.connection.close()
|
||||
return tags
|
||||
except Exception as e:
|
||||
print(f"Error fetching tags: {e}")
|
||||
return []
|
||||
|
||||
def get_synchronized_data(self, tags, hours_back=24, interval_minutes=10):
|
||||
"""Get synchronized data for correlation analysis"""
|
||||
if not self.connect_db():
|
||||
return pd.DataFrame()
|
||||
|
||||
# Create time buckets for synchronization
|
||||
placeholders = ','.join(['%s'] * len(tags))
|
||||
|
||||
query = f"""
|
||||
SELECT
|
||||
FLOOR(UNIX_TIMESTAMP(h.TimeStamp) / (%s * 60)) * (%s * 60) as time_bucket,
|
||||
n.name as tag_name,
|
||||
AVG(h.Value) as avg_value,
|
||||
COUNT(*) as point_count
|
||||
FROM historicaldata h
|
||||
INNER JOIN id_names n ON h.ID = n.idnumber
|
||||
WHERE n.name IN ({placeholders})
|
||||
AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
|
||||
AND h.Value IS NOT NULL
|
||||
GROUP BY time_bucket, n.name
|
||||
HAVING point_count >= 1
|
||||
ORDER BY time_bucket ASC
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor = self.connection.cursor(dictionary=True)
|
||||
params = [interval_minutes, interval_minutes] + tags + [hours_back]
|
||||
cursor.execute(query, params)
|
||||
raw_data = cursor.fetchall()
|
||||
cursor.close()
|
||||
self.connection.close()
|
||||
|
||||
# Convert to DataFrame
|
||||
df = pd.DataFrame(raw_data)
|
||||
if df.empty:
|
||||
return pd.DataFrame()
|
||||
|
||||
# Pivot to get tags as columns
|
||||
pivot_df = df.pivot_table(
|
||||
index='time_bucket',
|
||||
columns='tag_name',
|
||||
values='avg_value',
|
||||
aggfunc='mean'
|
||||
)
|
||||
|
||||
# Only keep rows with data for all tags
|
||||
complete_data = pivot_df.dropna()
|
||||
|
||||
# Add timestamp column
|
||||
complete_data['timestamp'] = pd.to_datetime(complete_data.index, unit='s')
|
||||
|
||||
return complete_data.reset_index(drop=True)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching synchronized data: {e}")
|
||||
return pd.DataFrame()
|
||||
|
||||
def calculate_correlation_matrix(self, df, method='pearson', tags=None):
|
||||
"""Calculate correlation matrix with different methods"""
|
||||
if df.empty or tags is None:
|
||||
return pd.DataFrame(), pd.DataFrame()
|
||||
|
||||
# Select only the tag columns
|
||||
data_cols = [col for col in tags if col in df.columns]
|
||||
if not data_cols:
|
||||
return pd.DataFrame(), pd.DataFrame()
|
||||
|
||||
data = df[data_cols]
|
||||
|
||||
# Calculate correlation matrix
|
||||
if method == 'pearson':
|
||||
corr_matrix = data.corr(method='pearson')
|
||||
elif method == 'spearman':
|
||||
corr_matrix = data.corr(method='spearman')
|
||||
elif method == 'kendall':
|
||||
corr_matrix = data.corr(method='kendall')
|
||||
else:
|
||||
corr_matrix = data.corr(method='pearson')
|
||||
|
||||
# Calculate p-values
|
||||
p_values = pd.DataFrame(index=corr_matrix.index, columns=corr_matrix.columns)
|
||||
|
||||
for i, col1 in enumerate(corr_matrix.columns):
|
||||
for j, col2 in enumerate(corr_matrix.columns):
|
||||
if i == j:
|
||||
p_values.loc[col1, col2] = 0.0
|
||||
else:
|
||||
if method == 'spearman':
|
||||
corr, p_val = spearmanr(data[col1].dropna(), data[col2].dropna())
|
||||
elif method == 'kendall':
|
||||
corr, p_val = kendalltau(data[col1].dropna(), data[col2].dropna())
|
||||
else:
|
||||
corr, p_val = pearsonr(data[col1].dropna(), data[col2].dropna())
|
||||
|
||||
p_values.loc[col1, col2] = p_val
|
||||
|
||||
return corr_matrix, p_values
|
||||
|
||||
def calculate_lag_correlations(self, df, tags, max_lag=6):
|
||||
"""Calculate time-lagged correlations"""
|
||||
if df.empty or len(tags) < 2:
|
||||
return []
|
||||
|
||||
lag_results = []
|
||||
|
||||
for i, tag1 in enumerate(tags):
|
||||
for j, tag2 in enumerate(tags):
|
||||
if i >= j or tag1 not in df.columns or tag2 not in df.columns:
|
||||
continue
|
||||
|
||||
series1 = df[tag1].dropna()
|
||||
series2 = df[tag2].dropna()
|
||||
|
||||
if len(series1) < 10 or len(series2) < 10:
|
||||
continue
|
||||
|
||||
best_corr = 0
|
||||
best_lag = 0
|
||||
|
||||
for lag in range(max_lag + 1):
|
||||
if lag == 0:
|
||||
# No lag
|
||||
min_len = min(len(series1), len(series2))
|
||||
s1 = series1[:min_len]
|
||||
s2 = series2[:min_len]
|
||||
else:
|
||||
# Apply lag
|
||||
if len(series1) <= lag or len(series2) <= lag:
|
||||
continue
|
||||
s1 = series1[lag:]
|
||||
s2 = series2[:-lag]
|
||||
|
||||
if len(s1) < 10:
|
||||
continue
|
||||
|
||||
try:
|
||||
corr, _ = pearsonr(s1, s2)
|
||||
if abs(corr) > abs(best_corr):
|
||||
best_corr = corr
|
||||
best_lag = lag
|
||||
except:
|
||||
continue
|
||||
|
||||
if abs(best_corr) > 0.3: # Only include meaningful correlations
|
||||
lag_results.append({
|
||||
'tag_x': tag1,
|
||||
'tag_y': tag2,
|
||||
'correlation': best_corr,
|
||||
'lag': best_lag,
|
||||
'lag_minutes': best_lag * 10 # Assuming 10-minute intervals
|
||||
})
|
||||
|
||||
# Sort by correlation strength
|
||||
lag_results.sort(key=lambda x: abs(x['correlation']), reverse=True)
|
||||
return lag_results[:20] # Return top 20
|
||||
|
||||
def perform_pca_analysis(self, df, tags):
|
||||
"""Perform Principal Component Analysis"""
|
||||
if df.empty or len(tags) < 3:
|
||||
return None
|
||||
|
||||
# Prepare data
|
||||
data_cols = [col for col in tags if col in df.columns]
|
||||
data = df[data_cols].dropna()
|
||||
|
||||
if data.empty or len(data) < 5:
|
||||
return None
|
||||
|
||||
# Standardize data
|
||||
scaler = StandardScaler()
|
||||
scaled_data = scaler.fit_transform(data)
|
||||
|
||||
# Perform PCA
|
||||
pca = PCA()
|
||||
pca_result = pca.fit_transform(scaled_data)
|
||||
|
||||
# Get explained variance
|
||||
explained_variance = pca.explained_variance_ratio_
|
||||
|
||||
# Get component loadings
|
||||
components = pd.DataFrame(
|
||||
pca.components_.T,
|
||||
columns=[f'PC{i+1}' for i in range(len(pca.components_))],
|
||||
index=data.columns
|
||||
)
|
||||
|
||||
return {
|
||||
'explained_variance': explained_variance.tolist(),
|
||||
'components': components.to_dict(),
|
||||
'cumulative_variance': np.cumsum(explained_variance).tolist(),
|
||||
'n_components': len(explained_variance)
|
||||
}
|
||||
|
||||
def generate_insights(self, corr_matrix, p_values, lag_correlations=None):
|
||||
"""Generate insights from correlation analysis"""
|
||||
if corr_matrix.empty:
|
||||
return {}
|
||||
|
||||
insights = {
|
||||
'strong_positive': [],
|
||||
'strong_negative': [],
|
||||
'moderate_correlations': [],
|
||||
'weak_correlations': [],
|
||||
'significant_lags': [],
|
||||
'summary': {}
|
||||
}
|
||||
|
||||
# Analyze correlations
|
||||
strong_pos_count = 0
|
||||
strong_neg_count = 0
|
||||
all_correlations = []
|
||||
|
||||
for i, col1 in enumerate(corr_matrix.columns):
|
||||
for j, col2 in enumerate(corr_matrix.columns):
|
||||
if i >= j: # Avoid duplicates and self-correlation
|
||||
continue
|
||||
|
||||
corr = corr_matrix.loc[col1, col2]
|
||||
p_val = p_values.loc[col1, col2] if not p_values.empty else 1.0
|
||||
|
||||
all_correlations.append(abs(corr))
|
||||
|
||||
if abs(corr) >= 0.7 and p_val < 0.05:
|
||||
insight = {
|
||||
'tag_pair': [col1, col2],
|
||||
'correlation': corr,
|
||||
'p_value': p_val,
|
||||
'strength': 'Strong'
|
||||
}
|
||||
|
||||
if corr > 0:
|
||||
insights['strong_positive'].append(insight)
|
||||
strong_pos_count += 1
|
||||
else:
|
||||
insights['strong_negative'].append(insight)
|
||||
strong_neg_count += 1
|
||||
|
||||
elif 0.3 <= abs(corr) < 0.7:
|
||||
insights['moderate_correlations'].append({
|
||||
'tag_pair': [col1, col2],
|
||||
'correlation': corr,
|
||||
'p_value': p_val,
|
||||
'strength': 'Moderate'
|
||||
})
|
||||
elif abs(corr) < 0.3:
|
||||
insights['weak_correlations'].append({
|
||||
'tag_pair': [col1, col2],
|
||||
'correlation': corr,
|
||||
'p_value': p_val,
|
||||
'strength': 'Weak'
|
||||
})
|
||||
|
||||
# Add lag correlations
|
||||
if lag_correlations:
|
||||
insights['significant_lags'] = [
|
||||
lag for lag in lag_correlations
|
||||
if abs(lag['correlation']) > 0.5 and lag['lag'] > 0
|
||||
][:10]
|
||||
|
||||
# Summary statistics
|
||||
insights['summary'] = {
|
||||
'total_pairs': len(all_correlations),
|
||||
'strong_positive_count': strong_pos_count,
|
||||
'strong_negative_count': strong_neg_count,
|
||||
'average_correlation': np.mean(all_correlations) if all_correlations else 0,
|
||||
'max_correlation': max(all_correlations) if all_correlations else 0,
|
||||
'highly_correlated_pairs': strong_pos_count + strong_neg_count
|
||||
}
|
||||
|
||||
return insights
|
||||
|
||||
# Initialize analyzer
|
||||
analyzer = CorrelationAnalyzer()
|
||||
|
||||
@app.route('/')
|
||||
def dashboard():
|
||||
"""Main dashboard"""
|
||||
return render_template('correlation_dashboard.html')
|
||||
|
||||
@app.route('/api/get_tags')
|
||||
def get_tags():
|
||||
"""API endpoint to get available tags"""
|
||||
try:
|
||||
tags = analyzer.get_available_tags()
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'tags': tags,
|
||||
'total_count': len(tags)
|
||||
})
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
@app.route('/api/analyze', methods=['POST'])
|
||||
def analyze_correlations():
|
||||
"""API endpoint for correlation analysis"""
|
||||
try:
|
||||
data = request.json
|
||||
selected_tags = data.get('tags', [])
|
||||
time_range = int(data.get('time_range', 24))
|
||||
sampling_interval = int(data.get('sampling_interval', 10))
|
||||
correlation_method = data.get('correlation_method', 'pearson')
|
||||
lag_analysis = data.get('lag_analysis', False)
|
||||
max_lag = int(data.get('max_lag', 6))
|
||||
include_pca = data.get('include_pca', False)
|
||||
|
||||
if len(selected_tags) < 2:
|
||||
raise ValueError("Please select at least 2 variables")
|
||||
|
||||
if len(selected_tags) > 25:
|
||||
raise ValueError("Maximum 25 variables allowed")
|
||||
|
||||
# Get synchronized data
|
||||
df = analyzer.get_synchronized_data(selected_tags, time_range, sampling_interval)
|
||||
|
||||
if df.empty:
|
||||
raise ValueError("No synchronized data found for selected variables")
|
||||
|
||||
# Calculate correlation matrix
|
||||
corr_matrix, p_values = analyzer.calculate_correlation_matrix(
|
||||
df, correlation_method, selected_tags
|
||||
)
|
||||
|
||||
# Calculate lag correlations if requested
|
||||
lag_correlations = []
|
||||
if lag_analysis:
|
||||
lag_correlations = analyzer.calculate_lag_correlations(df, selected_tags, max_lag)
|
||||
|
||||
# Perform PCA if requested
|
||||
pca_results = None
|
||||
if include_pca and len(selected_tags) >= 3:
|
||||
pca_results = analyzer.perform_pca_analysis(df, selected_tags)
|
||||
|
||||
# Generate insights
|
||||
insights = analyzer.generate_insights(corr_matrix, p_values, lag_correlations)
|
||||
|
||||
# Create correlation heatmap
|
||||
heatmap_data = create_correlation_heatmap(corr_matrix)
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'correlation_matrix': corr_matrix.to_dict() if not corr_matrix.empty else {},
|
||||
'p_values': p_values.to_dict() if not p_values.empty else {},
|
||||
'lag_correlations': lag_correlations,
|
||||
'insights': insights,
|
||||
'pca_results': pca_results,
|
||||
'heatmap_data': heatmap_data,
|
||||
'data_points': len(df),
|
||||
'tags': selected_tags,
|
||||
'analysis_params': {
|
||||
'time_range': time_range,
|
||||
'method': correlation_method,
|
||||
'sampling_interval': sampling_interval
|
||||
}
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
@app.route('/api/scatter_plot', methods=['POST'])
|
||||
def get_scatter_plot():
|
||||
"""API endpoint for scatter plot data"""
|
||||
try:
|
||||
data = request.json
|
||||
tag_x = data.get('tag_x')
|
||||
tag_y = data.get('tag_y')
|
||||
time_range = int(data.get('time_range', 24))
|
||||
|
||||
if not tag_x or not tag_y:
|
||||
raise ValueError("Both X and Y variables must be specified")
|
||||
|
||||
# Get data for the two tags
|
||||
df = analyzer.get_synchronized_data([tag_x, tag_y], time_range, 5)
|
||||
|
||||
if df.empty or tag_x not in df.columns or tag_y not in df.columns:
|
||||
raise ValueError("No data found for the selected variables")
|
||||
|
||||
# Create scatter plot
|
||||
scatter_data = create_scatter_plot(df, tag_x, tag_y)
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'scatter_data': scatter_data,
|
||||
'tag_x': tag_x,
|
||||
'tag_y': tag_y
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
@app.route('/api/export', methods=['POST'])
|
||||
def export_data():
|
||||
"""API endpoint for data export"""
|
||||
try:
|
||||
data = request.json
|
||||
selected_tags = data.get('tags', [])
|
||||
time_range = int(data.get('time_range', 24))
|
||||
export_format = data.get('format', 'csv')
|
||||
|
||||
if not selected_tags:
|
||||
raise ValueError("No variables selected for export")
|
||||
|
||||
# Get data
|
||||
df = analyzer.get_synchronized_data(selected_tags, time_range, 5)
|
||||
|
||||
if df.empty:
|
||||
raise ValueError("No data to export")
|
||||
|
||||
# Prepare export data
|
||||
export_df = df[selected_tags + ['timestamp']].copy()
|
||||
export_df['timestamp'] = pd.to_datetime(export_df['timestamp'])
|
||||
|
||||
if export_format == 'csv':
|
||||
# Create CSV
|
||||
output = io.StringIO()
|
||||
export_df.to_csv(output, index=False)
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': output.getvalue(),
|
||||
'filename': f'correlation_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
|
||||
})
|
||||
|
||||
else:
|
||||
raise ValueError("Unsupported export format")
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
def create_correlation_heatmap(corr_matrix):
|
||||
"""Create correlation heatmap using Plotly"""
|
||||
if corr_matrix.empty:
|
||||
return None
|
||||
|
||||
fig = go.Figure(data=go.Heatmap(
|
||||
z=corr_matrix.values,
|
||||
x=corr_matrix.columns.tolist(),
|
||||
y=corr_matrix.index.tolist(),
|
||||
colorscale='RdBu',
|
||||
zmid=0,
|
||||
text=np.round(corr_matrix.values, 3),
|
||||
texttemplate='%{text}',
|
||||
textfont={"size": 10},
|
||||
hoverongaps=False
|
||||
))
|
||||
|
||||
fig.update_layout(
|
||||
title='Correlation Matrix Heatmap',
|
||||
xaxis_title='Variables',
|
||||
yaxis_title='Variables',
|
||||
width=800,
|
||||
height=600,
|
||||
font=dict(size=12)
|
||||
)
|
||||
|
||||
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
|
||||
|
||||
def create_scatter_plot(df, tag_x, tag_y):
|
||||
"""Create scatter plot using Plotly"""
|
||||
fig = go.Figure()
|
||||
|
||||
# Add scatter points
|
||||
fig.add_trace(go.Scatter(
|
||||
x=df[tag_x],
|
||||
y=df[tag_y],
|
||||
mode='markers',
|
||||
marker=dict(
|
||||
size=6,
|
||||
color=df.index,
|
||||
colorscale='Viridis',
|
||||
showscale=True,
|
||||
colorbar=dict(title="Time Sequence")
|
||||
),
|
||||
text=df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
hovertemplate=f'<b>{tag_x}</b>: %{{x}}<br><b>{tag_y}</b>: %{{y}}<br><b>Time</b>: %{{text}}<extra></extra>'
|
||||
))
|
||||
|
||||
# Add trend line
|
||||
z = np.polyfit(df[tag_x].dropna(), df[tag_y].dropna(), 1)
|
||||
p = np.poly1d(z)
|
||||
x_trend = np.linspace(df[tag_x].min(), df[tag_x].max(), 100)
|
||||
|
||||
fig.add_trace(go.Scatter(
|
||||
x=x_trend,
|
||||
y=p(x_trend),
|
||||
mode='lines',
|
||||
name='Trend Line',
|
||||
line=dict(color='red', width=2, dash='dash')
|
||||
))
|
||||
|
||||
# Calculate correlation
|
||||
corr, p_val = pearsonr(df[tag_x].dropna(), df[tag_y].dropna())
|
||||
|
||||
fig.update_layout(
|
||||
title=f'Scatter Plot: {tag_x} vs {tag_y}<br>Correlation: {corr:.3f} (p-value: {p_val:.3f})',
|
||||
xaxis_title=tag_x,
|
||||
yaxis_title=tag_y,
|
||||
width=800,
|
||||
height=600,
|
||||
showlegend=True
|
||||
)
|
||||
|
||||
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True, host='0.0.0.0', port=5000)
|
||||
Reference in New Issue
Block a user