import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
import seaborn as sns
import requests
import time
from pathlib import Path
from typing import List, Tuple, Optional, Dict
from gseapy import enrichr
import warnings
warnings.filterwarnings('ignore')
# --- 1. CNA 核心分析類別 ---
class CNA_Dual_Analyzer:
def __init__(self, species: int = 9606, max_api_genes: int = 200):
self.species = species
self.max_api_genes = max_api_genes
self.api_url = "https://string-db.org/api/json/network"
def _safe_string_api(self, genes: List[str]) -> pd.DataFrame:
if len(genes) < 2: return pd.DataFrame()
batch_genes = genes[:self.max_api_genes]
params = {
"identifiers": "%0d".join(batch_genes),
"species": self.species,
"caller_identity": "cna_v2_1"
}
try:
resp = requests.post(self.api_url, data=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if not data: return pd.DataFrame()
df = pd.DataFrame(data)
return df[['preferredName_A', 'preferredName_B', 'score']].dropna()
except Exception as e:
print(f"⚠️ STRING API失敗: {e}")
return pd.DataFrame()
def compute_cna_metrics(self, cluster_data: pd.DataFrame, at_cols: List[str], mt_cols: List[str]) -> pd.DataFrame:
metrics = []
for _, row in cluster_data.iterrows():
gene = str(row.get('Gene_symbol', row.index[0])).strip().upper()
if not gene or gene in ['NAN', 'NONE', '']: continue
mean_at = row[at_cols].mean() if at_cols else 0.0
mean_mt = row[mt_cols].mean() if mt_cols else 0.0
asinh_diff = np.arcsinh(mean_mt) - np.arcsinh(mean_at)
log2fc = np.log2((mean_mt + 1e-6) / (mean_at + 1e-6))
metrics.append({
'Gene': gene, 'Mean_AT': float(mean_at), 'Mean_MT': float(mean_mt),
'asinh_Diff': float(asinh_diff), 'Log2FC': float(log2fc),
'Expr_Change': 'UP' if asinh_diff > 0 else 'DOWN'
})
return pd.DataFrame(metrics)
def build_control_network(self, genes: List[str], metrics_df: pd.DataFrame) -> Tuple[Optional[nx.Graph], pd.DataFrame]:
ppi_df = self._safe_string_api(genes)
if ppi_df.empty: return None, pd.DataFrame()
G = nx.Graph()
for _, row in ppi_df.iterrows():
G.add_edge(row['preferredName_A'], row['preferredName_B'], weight=float(row['score']) / 1000)
centrality = nx.degree_centrality(G)
results = []
for node in G.nodes():
gene_metrics = metrics_df[metrics_df['Gene'] == node]
asinh_diff = gene_metrics['asinh_Diff'].iloc[0] if not gene_metrics.empty else 0.0
ctrl_score = centrality.get(node, 0) * (abs(asinh_diff) + 1)
results.append({
'Gene': node, 'Control_Score': ctrl_score, 'Centrality': centrality.get(node, 0),
'asinh_Diff': asinh_diff, 'Log2FC': gene_metrics['Log2FC'].iloc[0] if not gene_metrics.empty else 0.0,
'Expr_Change': gene_metrics['Expr_Change'].iloc[0] if not gene_metrics.empty else 'NONE'
})
stats_df = pd.DataFrame(results).sort_values('Control_Score', ascending=False)
return G, stats_df
# --- 2. 視覺化類別 (整合 CNetplot) ---
class NetworkVisualizer:
@staticmethod
def create_cna_network(G, stats_df, output_path, cluster_id):
"""核心 PPI 控制網絡圖"""
if G is None or stats_df.empty: return
plt.figure(figsize=(12, 10), dpi=100)
pos = nx.spring_layout(G, k=0.3, iterations=50)
sizes = 100 + stats_df.set_index('Gene').reindex(G.nodes())['Control_Score'] * 2000
colors = stats_df.set_index('Gene').reindex(G.nodes())['asinh_Diff']
nx.draw_networkx_nodes(G, pos, node_size=sizes, node_color=colors, cmap='RdYlBu_r', alpha=0.8)
nx.draw_networkx_edges(G, pos, alpha=0.2)
top_genes = stats_df.head(20)['Gene'].tolist()
nx.draw_networkx_labels(G, pos, {n: n for n in G.nodes() if n in top_genes}, font_size=8)
plt.title(f"CNA Network: Cluster {cluster_id}")
plt.savefig(output_path, bbox_inches='tight')
plt.close()
@staticmethod
def create_cnet_plot(stats_df: pd.DataFrame, output_path: Path, cluster_id: str):
"""生成 CNetplot (Gene-Pathway Network)"""
print(f"🧬 正在為 Cluster {cluster_id} 生成 CNetplot...")
genes = stats_df.head(100)['Gene'].tolist() # 取控制力前100的基因
try:
enr = enrichr(gene_list=genes, gene_sets='KEGG_2021_Human', organism='Human', outdir=None)
res = enr.results.nsmallest(5, 'Adjusted P-value') # 取前5個通路
if res.empty: return
G = nx.Graph()
# 建立基因與通路的連接
gene_colors = dict(zip(stats_df['Gene'], stats_df['asinh_Diff']))
for _, row in res.iterrows():
pathway = row['Term'].split('__')[-1]
G.add_node(pathway, type='pathway')
path_genes = row['Genes'].split(';')
for g in path_genes:
if g in genes:
G.add_node(g, type='gene')
G.add_edge(pathway, g)
plt.figure(figsize=(14, 10))
pos = nx.spring_layout(G, k=0.5)
# 繪製路徑節點 (大橘色)
path_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'pathway']
nx.draw_networkx_nodes(G, pos, nodelist=path_nodes, node_color='#FF7F0E',
node_size=1200, node_shape='p', label='Pathways')
# 繪製基因節點 (顏色代表 asinh_Diff)
gene_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'gene']
g_colors = [gene_colors.get(n, 0) for n in gene_nodes]
g_sizes = [stats_df[stats_df['Gene']==n]['Control_Score'].iloc[0]*800 + 100 for n in gene_nodes]
nodes = nx.draw_networkx_nodes(G, pos, nodelist=gene_nodes, node_color=g_colors,
node_size=g_sizes, cmap='RdYlBu_r', label='Genes')
nx.draw_networkx_edges(G, pos, alpha=0.3, edge_color='gray')
nx.draw_networkx_labels(G, pos, font_size=9, font_weight='bold')
plt.colorbar(nodes, label='asinh Difference')
plt.title(f"CNetplot: Top Pathways & Control Genes (Cluster {cluster_id})")
plt.legend(scatterpoints=1)
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
except Exception as e:
print(f"⚠️ CNetplot失敗: {e}")
# --- 3. 主程序 ---
def main_pipeline(excel_path: Path):
output_dir = Path.home() / "Desktop" / "CNA_Analysis_CNet"
output_dir.mkdir(exist_ok=True)
df = pd.read_excel(excel_path)
at_cols = [c for c in df.columns if 'AT' in str(c).upper()]
mt_cols = [c for c in df.columns if 'MT' in str(c).upper()]
gene_col = next(c for c in ['Gene_symbol', 'GeneID'] if c in df.columns)
analyzer = CNA_Dual_Analyzer()
visualizer = NetworkVisualizer()
clusters = sorted(df['leiden'].unique().astype(str))
for cid in clusters:
print(f"\n🚀 處理 Cluster {cid}...")
cluster_df = df[df['leiden'].astype(str) == cid]
cluster_dir = output_dir / f"Cluster_{cid}"
cluster_dir.mkdir(exist_ok=True)
# 1. 計算指標
metrics_df = analyzer.compute_cna_metrics(cluster_df, at_cols, mt_cols)
# 2. 構建 CNA 網絡
genes = metrics_df['Gene'].tolist()
G, stats_df = analyzer.build_control_network(genes, metrics_df)
if not stats_df.empty:
stats_df.to_excel(cluster_dir / "CNA_Report.xlsx", index=False)
# 3. 繪製 PPI 控制圖
visualizer.create_cna_network(G, stats_df, cluster_dir / "CNA_Network.png", cid)
# 4. 繪製 CNetplot (路徑-基因交互圖)
visualizer.create_cnet_plot(stats_df, cluster_dir / "CNetplot.png", cid)
time.sleep(1)
if __name__ == "__main__":
file_path = Path.home() / "Desktop" / "Aja 21649_LeidCom_clustered.xlsx"
main_pipeline(file_path)
No comments:
Post a Comment