from flask import Flask, render_template, request, jsonify, send_file import pandas as pd import numpy as np import mysql.connector from scipy import stats from scipy.stats import pearsonr, spearmanr, kendalltau import plotly.graph_objects as go import plotly.express as px import plotly.utils import json from datetime import datetime, timedelta import io import base64 from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA import seaborn as sns import matplotlib.pyplot as plt import warnings warnings.filterwarnings('ignore') app = Flask(__name__) # Database configuration DB_CONFIG = { 'host': '192.168.0.13', 'user': 'opce', 'password': 'opcelasuca', 'database': 'archive' } class CorrelationAnalyzer: def __init__(self): self.connection = None def connect_db(self): """Connect to database""" try: self.connection = mysql.connector.connect(**DB_CONFIG) return True except Exception as e: print(f"Database connection error: {e}") return False def get_available_tags(self, min_records=100, days_back=30): """Get available tags with sufficient data""" if not self.connect_db(): return [] query = """ SELECT DISTINCT n.name, COUNT(*) as record_count, MIN(h.TimeStamp) as earliest_date, MAX(h.TimeStamp) as latest_date, AVG(h.Value) as avg_value, STDDEV(h.Value) as std_dev FROM id_names n INNER JOIN historicaldata h ON n.idnumber = h.ID WHERE n.name IS NOT NULL AND n.name != '' AND h.Value IS NOT NULL AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY n.name HAVING record_count >= %s ORDER BY record_count DESC, n.name ASC """ try: cursor = self.connection.cursor(dictionary=True) cursor.execute(query, (days_back, min_records)) tags = cursor.fetchall() cursor.close() self.connection.close() return tags except Exception as e: print(f"Error fetching tags: {e}") return [] def get_synchronized_data(self, tags, hours_back=24, interval_minutes=10): """Get synchronized data for correlation analysis""" if not self.connect_db(): return pd.DataFrame() # Create time buckets for synchronization placeholders = ','.join(['%s'] * len(tags)) query = f""" SELECT FLOOR(UNIX_TIMESTAMP(h.TimeStamp) / (%s * 60)) * (%s * 60) as time_bucket, n.name as tag_name, AVG(h.Value) as avg_value, COUNT(*) as point_count FROM historicaldata h INNER JOIN id_names n ON h.ID = n.idnumber WHERE n.name IN ({placeholders}) AND h.TimeStamp >= DATE_SUB(NOW(), INTERVAL %s HOUR) AND h.Value IS NOT NULL GROUP BY time_bucket, n.name HAVING point_count >= 1 ORDER BY time_bucket ASC """ try: cursor = self.connection.cursor(dictionary=True) params = [interval_minutes, interval_minutes] + tags + [hours_back] cursor.execute(query, params) raw_data = cursor.fetchall() cursor.close() self.connection.close() # Convert to DataFrame df = pd.DataFrame(raw_data) if df.empty: return pd.DataFrame() # Pivot to get tags as columns pivot_df = df.pivot_table( index='time_bucket', columns='tag_name', values='avg_value', aggfunc='mean' ) # Only keep rows with data for all tags complete_data = pivot_df.dropna() # Add timestamp column complete_data['timestamp'] = pd.to_datetime(complete_data.index, unit='s') return complete_data.reset_index(drop=True) except Exception as e: print(f"Error fetching synchronized data: {e}") return pd.DataFrame() def calculate_correlation_matrix(self, df, method='pearson', tags=None): """Calculate correlation matrix with different methods""" if df.empty or tags is None: return pd.DataFrame(), pd.DataFrame() # Select only the tag columns data_cols = [col for col in tags if col in df.columns] if not data_cols: return pd.DataFrame(), pd.DataFrame() data = df[data_cols] # Calculate correlation matrix if method == 'pearson': corr_matrix = data.corr(method='pearson') elif method == 'spearman': corr_matrix = data.corr(method='spearman') elif method == 'kendall': corr_matrix = data.corr(method='kendall') else: corr_matrix = data.corr(method='pearson') # Calculate p-values p_values = pd.DataFrame(index=corr_matrix.index, columns=corr_matrix.columns) for i, col1 in enumerate(corr_matrix.columns): for j, col2 in enumerate(corr_matrix.columns): if i == j: p_values.loc[col1, col2] = 0.0 else: if method == 'spearman': corr, p_val = spearmanr(data[col1].dropna(), data[col2].dropna()) elif method == 'kendall': corr, p_val = kendalltau(data[col1].dropna(), data[col2].dropna()) else: corr, p_val = pearsonr(data[col1].dropna(), data[col2].dropna()) p_values.loc[col1, col2] = p_val return corr_matrix, p_values def calculate_lag_correlations(self, df, tags, max_lag=6): """Calculate time-lagged correlations""" if df.empty or len(tags) < 2: return [] lag_results = [] for i, tag1 in enumerate(tags): for j, tag2 in enumerate(tags): if i >= j or tag1 not in df.columns or tag2 not in df.columns: continue series1 = df[tag1].dropna() series2 = df[tag2].dropna() if len(series1) < 10 or len(series2) < 10: continue best_corr = 0 best_lag = 0 for lag in range(max_lag + 1): if lag == 0: # No lag min_len = min(len(series1), len(series2)) s1 = series1[:min_len] s2 = series2[:min_len] else: # Apply lag if len(series1) <= lag or len(series2) <= lag: continue s1 = series1[lag:] s2 = series2[:-lag] if len(s1) < 10: continue try: corr, _ = pearsonr(s1, s2) if abs(corr) > abs(best_corr): best_corr = corr best_lag = lag except: continue if abs(best_corr) > 0.3: # Only include meaningful correlations lag_results.append({ 'tag_x': tag1, 'tag_y': tag2, 'correlation': best_corr, 'lag': best_lag, 'lag_minutes': best_lag * 10 # Assuming 10-minute intervals }) # Sort by correlation strength lag_results.sort(key=lambda x: abs(x['correlation']), reverse=True) return lag_results[:20] # Return top 20 def perform_pca_analysis(self, df, tags): """Perform Principal Component Analysis""" if df.empty or len(tags) < 3: return None # Prepare data data_cols = [col for col in tags if col in df.columns] data = df[data_cols].dropna() if data.empty or len(data) < 5: return None # Standardize data scaler = StandardScaler() scaled_data = scaler.fit_transform(data) # Perform PCA pca = PCA() pca_result = pca.fit_transform(scaled_data) # Get explained variance explained_variance = pca.explained_variance_ratio_ # Get component loadings components = pd.DataFrame( pca.components_.T, columns=[f'PC{i+1}' for i in range(len(pca.components_))], index=data.columns ) return { 'explained_variance': explained_variance.tolist(), 'components': components.to_dict(), 'cumulative_variance': np.cumsum(explained_variance).tolist(), 'n_components': len(explained_variance) } def generate_insights(self, corr_matrix, p_values, lag_correlations=None): """Generate insights from correlation analysis""" if corr_matrix.empty: return {} insights = { 'strong_positive': [], 'strong_negative': [], 'moderate_correlations': [], 'weak_correlations': [], 'significant_lags': [], 'summary': {} } # Analyze correlations strong_pos_count = 0 strong_neg_count = 0 all_correlations = [] for i, col1 in enumerate(corr_matrix.columns): for j, col2 in enumerate(corr_matrix.columns): if i >= j: # Avoid duplicates and self-correlation continue corr = corr_matrix.loc[col1, col2] p_val = p_values.loc[col1, col2] if not p_values.empty else 1.0 all_correlations.append(abs(corr)) if abs(corr) >= 0.7 and p_val < 0.05: insight = { 'tag_pair': [col1, col2], 'correlation': corr, 'p_value': p_val, 'strength': 'Strong' } if corr > 0: insights['strong_positive'].append(insight) strong_pos_count += 1 else: insights['strong_negative'].append(insight) strong_neg_count += 1 elif 0.3 <= abs(corr) < 0.7: insights['moderate_correlations'].append({ 'tag_pair': [col1, col2], 'correlation': corr, 'p_value': p_val, 'strength': 'Moderate' }) elif abs(corr) < 0.3: insights['weak_correlations'].append({ 'tag_pair': [col1, col2], 'correlation': corr, 'p_value': p_val, 'strength': 'Weak' }) # Add lag correlations if lag_correlations: insights['significant_lags'] = [ lag for lag in lag_correlations if abs(lag['correlation']) > 0.5 and lag['lag'] > 0 ][:10] # Summary statistics insights['summary'] = { 'total_pairs': len(all_correlations), 'strong_positive_count': strong_pos_count, 'strong_negative_count': strong_neg_count, 'average_correlation': np.mean(all_correlations) if all_correlations else 0, 'max_correlation': max(all_correlations) if all_correlations else 0, 'highly_correlated_pairs': strong_pos_count + strong_neg_count } return insights # Initialize analyzer analyzer = CorrelationAnalyzer() @app.route('/') def dashboard(): """Main dashboard""" return render_template('correlation_dashboard.html') @app.route('/api/get_tags') def get_tags(): """API endpoint to get available tags""" try: tags = analyzer.get_available_tags() return jsonify({ 'success': True, 'tags': tags, 'total_count': len(tags) }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) @app.route('/api/analyze', methods=['POST']) def analyze_correlations(): """API endpoint for correlation analysis""" try: data = request.json selected_tags = data.get('tags', []) time_range = int(data.get('time_range', 24)) sampling_interval = int(data.get('sampling_interval', 10)) correlation_method = data.get('correlation_method', 'pearson') lag_analysis = data.get('lag_analysis', False) max_lag = int(data.get('max_lag', 6)) include_pca = data.get('include_pca', False) if len(selected_tags) < 2: raise ValueError("Please select at least 2 variables") if len(selected_tags) > 25: raise ValueError("Maximum 25 variables allowed") # Get synchronized data df = analyzer.get_synchronized_data(selected_tags, time_range, sampling_interval) if df.empty: raise ValueError("No synchronized data found for selected variables") # Calculate correlation matrix corr_matrix, p_values = analyzer.calculate_correlation_matrix( df, correlation_method, selected_tags ) # Calculate lag correlations if requested lag_correlations = [] if lag_analysis: lag_correlations = analyzer.calculate_lag_correlations(df, selected_tags, max_lag) # Perform PCA if requested pca_results = None if include_pca and len(selected_tags) >= 3: pca_results = analyzer.perform_pca_analysis(df, selected_tags) # Generate insights insights = analyzer.generate_insights(corr_matrix, p_values, lag_correlations) # Create correlation heatmap heatmap_data = create_correlation_heatmap(corr_matrix) return jsonify({ 'success': True, 'correlation_matrix': corr_matrix.to_dict() if not corr_matrix.empty else {}, 'p_values': p_values.to_dict() if not p_values.empty else {}, 'lag_correlations': lag_correlations, 'insights': insights, 'pca_results': pca_results, 'heatmap_data': heatmap_data, 'data_points': len(df), 'tags': selected_tags, 'analysis_params': { 'time_range': time_range, 'method': correlation_method, 'sampling_interval': sampling_interval } }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) @app.route('/api/scatter_plot', methods=['POST']) def get_scatter_plot(): """API endpoint for scatter plot data""" try: data = request.json tag_x = data.get('tag_x') tag_y = data.get('tag_y') time_range = int(data.get('time_range', 24)) if not tag_x or not tag_y: raise ValueError("Both X and Y variables must be specified") # Get data for the two tags df = analyzer.get_synchronized_data([tag_x, tag_y], time_range, 5) if df.empty or tag_x not in df.columns or tag_y not in df.columns: raise ValueError("No data found for the selected variables") # Create scatter plot scatter_data = create_scatter_plot(df, tag_x, tag_y) return jsonify({ 'success': True, 'scatter_data': scatter_data, 'tag_x': tag_x, 'tag_y': tag_y }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) @app.route('/api/export', methods=['POST']) def export_data(): """API endpoint for data export""" try: data = request.json selected_tags = data.get('tags', []) time_range = int(data.get('time_range', 24)) export_format = data.get('format', 'csv') if not selected_tags: raise ValueError("No variables selected for export") # Get data df = analyzer.get_synchronized_data(selected_tags, time_range, 5) if df.empty: raise ValueError("No data to export") # Prepare export data export_df = df[selected_tags + ['timestamp']].copy() export_df['timestamp'] = pd.to_datetime(export_df['timestamp']) if export_format == 'csv': # Create CSV output = io.StringIO() export_df.to_csv(output, index=False) return jsonify({ 'success': True, 'data': output.getvalue(), 'filename': f'correlation_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' }) else: raise ValueError("Unsupported export format") except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) def create_correlation_heatmap(corr_matrix): """Create correlation heatmap using Plotly""" if corr_matrix.empty: return None fig = go.Figure(data=go.Heatmap( z=corr_matrix.values, x=corr_matrix.columns.tolist(), y=corr_matrix.index.tolist(), colorscale='RdBu', zmid=0, text=np.round(corr_matrix.values, 3), texttemplate='%{text}', textfont={"size": 10}, hoverongaps=False )) fig.update_layout( title='Correlation Matrix Heatmap', xaxis_title='Variables', yaxis_title='Variables', width=800, height=600, font=dict(size=12) ) return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) def create_scatter_plot(df, tag_x, tag_y): """Create scatter plot using Plotly""" fig = go.Figure() # Add scatter points fig.add_trace(go.Scatter( x=df[tag_x], y=df[tag_y], mode='markers', marker=dict( size=6, color=df.index, colorscale='Viridis', showscale=True, colorbar=dict(title="Time Sequence") ), text=df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S'), hovertemplate=f'{tag_x}: %{{x}}
{tag_y}: %{{y}}
Time: %{{text}}' )) # Add trend line z = np.polyfit(df[tag_x].dropna(), df[tag_y].dropna(), 1) p = np.poly1d(z) x_trend = np.linspace(df[tag_x].min(), df[tag_x].max(), 100) fig.add_trace(go.Scatter( x=x_trend, y=p(x_trend), mode='lines', name='Trend Line', line=dict(color='red', width=2, dash='dash') )) # Calculate correlation corr, p_val = pearsonr(df[tag_x].dropna(), df[tag_y].dropna()) fig.update_layout( title=f'Scatter Plot: {tag_x} vs {tag_y}
Correlation: {corr:.3f} (p-value: {p_val:.3f})', xaxis_title=tag_x, yaxis_title=tag_y, width=800, height=600, showlegend=True ) return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)