#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
整合單細胞分析管線:
1. 數據 QC 與強制全選預處理
2. NMF 最優 k 值網格搜尋 (Elbow & Cophenetic 穩定度分析)
3. 核心 NMF 潛在因子分解與 12 大細胞類型 Jaccard 映射
4. 下遊次要/隱藏信號 (Masked Housekeeping) KEGG 富集分析
5. 各 Factor Prerank GSEA 與仿 R 語言環狀 CNetPlot 繪製
6. STRING API 蛋白質交互作用網絡 (PPI) 與最大連通元件 Hub 基因挖掘
"""
import os
import gzip
import json
import time
import shutil
import hashlib
import warnings
import tempfile
import urllib.request
from pathlib import Path
import numpy as np
import pandas as pd
import scipy.io as sio
import scipy.cluster.hierarchy as hierarchy
from scipy.spatial.distance import pdist
import scanpy as sc
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
import gseapy as gp
from sklearn.decomposition import NMF
from sklearn.preprocessing import MinMaxScaler
# 強制使用無 GUI 後端,避免伺服器環境報錯
matplotlib.use("Agg")
warnings.filterwarnings("ignore")
# =============================================================================
# --- 1. 全局配置整合 (CONFIGURATION) ---
# =============================================================================
CONFIG = {
# 數據路徑 (可透過環境變數覆蓋)
"sc_data_path": Path(
os.environ.get(
"SC_DATA_PATH",
"/Users/yshuang/Documents/Python/data/ovary_filtered_feature_bc_matrix/",
)
),
"output_dir": Path(
os.environ.get(
"OUTPUT_DIR",
"/Users/yshuang/Documents/Python/NMF_GSEA_PPI_Report",
)
),
# QC 模式控制
"qc_mode": "lenient",
"lenient": {
"min_genes_per_cell": 1, # 最寬鬆:有表達即保留
"min_cells_per_gene": 1,
"max_mt_percent": 100.0, # 不過濾粒線體
"hvg_min_mean": 0.0125,
"hvg_max_mean": 3,
"hvg_min_disp": 0.5,
},
"strict": {
"min_genes_per_cell": 200,
"min_cells_per_gene": 3,
"max_mt_percent": 5.0,
"hvg_flavor": "seurat_v3",
"n_top_genes": 2000,
},
# 降維與 NMF 設定
"n_neighbors": 5,
"n_pcs": 30,
"nmf_k": 8, # 核心 Pipeline 採用的 k 值
"k_range_evaluation": range(4, 16, 2), # 網格搜尋掃描範圍
"evaluation_runs": 2, # 計算穩定度時的隨機初始化次數 (正式運作可設 3-5)
# 富集分析
"gene_sets": ["KEGG_2021_Human"],
"n_top_for_enr": 100,
"exclude_top_n": 50,
"top_genes_n": 50,
"enrichment_pval_cutoff": 0.05,
# GSEA 配置
"gsea_permutation_num": 1000,
"gsea_min_size": 15,
"gsea_max_size": 500,
"gsea_pval_cutoff": 0.05,
"gsea_fdr_cutoff": 0.25,
# PPI 配置
"ppi_top_n_genes": 40,
"ppi_score_cutoff": 500,
"ppi_species_id": 9606, # 9606 代表人類 (適用於同源基因比對)
"ppi_max_retries": 3,
"ppi_retry_delay": 2,
# 輸出控制
"enable_transcript_conversion": True,
"dpi": 150,
}
# 🧬 日本鰻核心轉錄本映射表
JAPANESE_EEL_TRANSCRIPT_MAP = {
"ISG15": "AJA11208.p1",
"AGRN": "AJA02345.p1",
"TNFRSF18": "AJA09876.p1",
"TNFRSF4": "AJA05432.p1",
"FAM132A": "AJA01122.p1",
"CPSF3L": "AJA03344.p1",
}
# 🌐 擴充版通用特徵基因集 (12大核心細胞類別)
DEFAULT_TISSUE_MARKERS = {
"Immune_Cell": ["PTPRC", "CD45", "CD3D", "CD3E", "CD14", "LYZ", "CD68", "CSF1R", "CD79A", "MS4A1", "IKZF1"],
"Epithelial_Cell": ["EPCAM", "KRT8", "KRT18", "KRT5", "KRT14", "CDH1", "CLDN1", "SCNN1A", "MUC1"],
"Endothelial_Cell": ["PECAM1", "CD31", "VWF", "CD34", "KDR", "FLT1", "ENG", "NOS3", "CDH5"],
"Stromal_Fibroblast": ["COL1A1", "COL1A2", "COL3A1", "DCN", "VIM", "PDGFRB", "ACTA2", "FAP", "THY1"],
"Proliferating_Cell": ["MKI67", "TOP2A", "PCNA", "MCM2", "MCM6", "CDK1", "CCNB1", "CENPF", "STMN1"],
"Neuronal_Cell": ["SYP", "SNAP25", "MAP2", "TUBB3", "NEFL", "RBFOX3", "NEU", "GAP43", "ELAVL4"],
"Glial_Support_Cell": ["GFAP", "S100B", "SOX9", "SOX10", "MBP", "MPZ", "PLP1", "APOE", "AJA11208.p1"],
"Smooth_Muscle_Pericyte": ["TAGLN", "MYH11", "MYLK", "RGS5", "VTN", "PDGFRB", "CSPG4", "MCAM"],
"Osteo_Chondro_Cell": ["RUNX2", "SP7", "BGLAP", "ALPL", "SOX9", "COL2A1", "ACAN", "COMP"],
"Stem_Progenitor_Cell": ["POU5F1", "OCT4", "NANOG", "SOX2", "MYC", "KLF4", "PROM1", "CD133", "NES"],
"Erythroid_Cell": ["HBA1", "HBB", "GYPA", "GYPB", "ALAS2", "SLC4A1", "ANK1"],
"Parenchymal_Metabolic_Cell": ["ALB", "APOA1", "FABP1", "PCK1", "G6PC1", "CYP3A4", "FAH", "TAT"]
}
# =============================================================================
# --- 2. 基礎工具函數 (IO & UTILS) ---
# =============================================================================
def save_plot(output_path, dpi=None, close=True):
dpi = dpi or CONFIG.get("dpi", 150)
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(output_path, dpi=dpi, bbox_inches="tight", facecolor="white")
if close:
plt.close("all")
def safe_write_dataframe(df, output_path):
if df is None or df.empty:
print(f"ℹ️ 數據為空,跳過寫入: {output_path}")
return
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(output_path.with_suffix(".csv"), index=False, encoding="utf-8-sig")
try:
tmp_path = output_path.parent / f".tmp_{output_path.name}"
with pd.ExcelWriter(tmp_path, engine="xlsxwriter") as writer:
sheet_name = "NMF_Report"
df.to_excel(writer, index=False, sheet_name=sheet_name)
worksheet = writer.sheets[sheet_name]
for i, col in enumerate(df.columns):
max_len = max(df[col].astype(str).map(len).max(), len(str(col))) + 3
worksheet.set_column(i, i, min(max_len, 50))
if tmp_path.exists():
tmp_path.replace(output_path.with_suffix(".xlsx"))
except Exception as e:
print(f"⚠️ Excel 寫入失敗,已保留 CSV 備份. 原因: {e}")
def verify_and_prepare_data(target_dir):
target_dir = Path(target_dir)
target_dir.mkdir(parents=True, exist_ok=True)
required = ["matrix.mtx.gz", "features.tsv.gz", "barcodes.tsv.gz"]
if all((target_dir / f).exists() for f in required):
return True
print("💡 未檢測到本地單細胞資料,自動生成模擬資料組...")
adata = sc.datasets.pbmc3k()
sio.mmwrite(target_dir / "matrix.mtx", adata.X.T)
with open(target_dir / "matrix.mtx", "rb") as f_in, gzip.open(target_dir / "matrix.mtx.gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
(target_dir / "matrix.mtx").unlink()
with open(target_dir / "features.tsv", "w") as f:
for gene in adata.var_names:
f.write(f"{gene}\t{gene}\tGene Expression\n")
with open(target_dir / "features.tsv", "rb") as f_in, gzip.open(target_dir / "features.tsv.gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
(target_dir / "features.tsv").unlink()
with open(target_dir / "barcodes.tsv", "w") as f:
for bc in adata.obs_names:
f.write(f"{bc}\n")
with open(target_dir / "barcodes.tsv", "rb") as f_in, gzip.open(target_dir / "barcodes.tsv.gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
(target_dir / "barcodes.tsv").unlink()
return True
# =============================================================================
# --- 3. 轉錄本轉換器 (TRANSCRIPT CONVERTER) ---
# =============================================================================
def generate_transcript_id(gene_name):
clean_name = str(gene_name).strip()
upper_name = clean_name.upper()
if upper_name in JAPANESE_EEL_TRANSCRIPT_MAP:
return JAPANESE_EEL_TRANSCRIPT_MAP[upper_name]
hash_digest = hashlib.md5(clean_name.encode("utf-8")).hexdigest()
unique_num = int(hash_digest, 16) % 90000 + 10000
return f"AJA{unique_num}.p1"
def expand_gene_weights_to_transcripts(h_gene_df):
factor_cols = [c for c in h_gene_df.columns if c.startswith("F")]
rows = []
for gene_symbol in h_gene_df.index:
transcript_id = generate_transcript_id(gene_symbol)
row = {"Transcriptid": transcript_id, "gene_symbol": gene_symbol}
for col in factor_cols:
row[col] = h_gene_df.loc[gene_symbol, col]
rows.append(row)
df = pd.DataFrame(rows)
return df[["Transcriptid", "gene_symbol"] + factor_cols]
# =============================================================================
# --- 4. NMF K 值優化評估函數 (OPTIMIZATION) ---
# =============================================================================
def evaluate_nmf_k(adata_sub, k_range=range(4, 16, 2), n_run=2):
"""
對單細胞矩陣進行多個 k 值的 NMF 評估,繪製 Elbow 曲線與共振穩定度。
"""
print("\n====== 【優化步驟】進行 NMF 最優 k 值網格搜尋 ======")
nmf_input = adata_sub.X.toarray() if hasattr(adata_sub.X, "toarray") else adata_sub.X
nmf_input = np.clip(nmf_input, a_min=0, a_max=None)
nmf_input_scaled = MinMaxScaler().fit_transform(nmf_input)
# 避免細胞數量過大導致 Cophenetic 階層分群記憶體崩潰 (上限 2000 細胞)
if nmf_input_scaled.shape[0] > 2000:
np.random.seed(42)
indices = np.random.choice(nmf_input_scaled.shape[0], 2000, replace=False)
nmf_input_eval = nmf_input_scaled[indices, :]
print(f"ℹ️ 細胞數過多 ({nmf_input_scaled.shape[0]}),下採樣至 2000 用於穩定度計算。")
else:
nmf_input_eval = nmf_input_scaled
results = []
for k in k_range:
errors = []
cophenetic_scores = []
print(f" 📊 正在評估 k = {k} ... ", end="", flush=True)
for run in range(n_run):
nmf_model = NMF(n_components=k, init="random", random_state=42 + run, max_iter=500, tol=1e-3)
W = nmf_model.fit_transform(nmf_input_eval)
errors.append(nmf_model.reconstruction_err_)
if run == 0:
# 基於單次 W 矩陣計算 Cophenetic 相關係數
# 基於單次 W 矩陣計算 Cophenetic 相關係數
z = hierarchy.linkage(W, method='average')
ciph, _ = hierarchy.cophenet(z, pdist(W)) # 將 cophenetic 改為 cophenet
cophenetic_scores.append(ciph)
results.append({
"k": k,
"Reconstruction_Error": np.mean(errors),
"Cophenetic_Stability": np.mean(cophenetic_scores) if cophenetic_scores else 0
})
print("完成。")
df_res = pd.DataFrame(results)
# 繪圖輸出
fig, ax1 = plt.subplots(figsize=(8, 5))
color = 'tab:red'
ax1.set_xlabel('Number of Factors (k)', fontweight='bold')
ax1.set_ylabel('Reconstruction Error', color=color, fontweight='bold')
ax1.plot(df_res['k'], df_res['Reconstruction_Error'], marker='o', color=color, linewidth=2)
ax1.tick_params(axis='y', labelcolor=color)
ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Cophenetic Correlation (Stability)', color=color, fontweight='bold')
ax2.plot(df_res['k'], df_res['Cophenetic_Stability'], marker='s', color=color, linewidth=2, linestyle='--')
ax2.tick_params(axis='y', labelcolor=color)
plt.title('NMF Optimal k Selection (Elbow & Stability)', fontsize=14, fontweight='bold', pad=15)
fig.tight_layout()
save_plot(CONFIG["output_dir"] / "nmf_k_evaluation.png")
print(f"✅ NMF K值評估指標曲線已儲存至: {CONFIG['output_dir'] / 'nmf_k_evaluation.png'}")
return df_res
# =============================================================================
# --- 5. CORE FUNCTION 1: SINGLE-CELL NMF & CLUSTERING ---
# =============================================================================
def run_sc_nmf_pipeline(config):
print("\n====== 【步驟一】單細胞 QC 預處理與 NMF 分群 ======")
verify_and_prepare_data(config["sc_data_path"])
adata = sc.read_10x_mtx(config["sc_data_path"], var_names="gene_symbols", cache=False)
adata.var_names_make_unique()
adata.var["mt"] = adata.var_names.str.upper().str.startswith("MT-")
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True)
qc = config[config["qc_mode"]]
sc.pp.filter_cells(adata, min_genes=qc["min_genes_per_cell"])
sc.pp.filter_genes(adata, min_cells=qc["min_cells_per_gene"])
adata = adata[adata.obs["pct_counts_mt"] <= qc["max_mt_percent"], :].copy()
sc.pp.normalize_total(adata, target_sum=1e4)
sc.pp.log1p(adata)
adata.raw = adata
# 強制全選基因模式
adata.var["highly_variable"] = True
adata_sub = adata[:, adata.var["highly_variable"]].copy()
print(f"📊 最終納入計算的基因總數: {adata_sub.shape[1]} 個")
sc.tl.pca(adata_sub, n_comps=config["n_pcs"], svd_solver="arpack")
sc.pp.neighbors(adata_sub, n_neighbors=config["n_neighbors"], n_pcs=config["n_pcs"])
sc.tl.umap(adata_sub)
# 執行最優 K 值評估
evaluate_nmf_k(adata_sub, k_range=config["k_range_evaluation"], n_run=config["evaluation_runs"])
print(f"🚀 正在執行正式 NMF 分解 (K = {config['nmf_k']})...")
nmf_input = adata_sub.X.toarray() if hasattr(adata_sub.X, "toarray") else adata_sub.X
nmf_input = np.clip(nmf_input, a_min=0, a_max=None)
scaler = MinMaxScaler()
nmf_input_scaled = scaler.fit_transform(nmf_input)
nmf_model = NMF(n_components=config["nmf_k"], init="nndsvd", random_state=42, max_iter=1000, tol=1e-4)
W_cell = nmf_model.fit_transform(nmf_input_scaled)
H_gene = nmf_model.components_
nmf_labels = np.argmax(W_cell, axis=1)
adata_sub.obs["nmf_raw"] = pd.Categorical([f"F{l+1}" for l in nmf_labels])
for i in range(config["nmf_k"]):
adata_sub.obs[f"F{i+1}"] = W_cell[:, i]
h_gene_df = pd.DataFrame(H_gene.T, index=adata_sub.var_names, columns=[f"F{i+1}" for i in range(config["nmf_k"])])
print(f"📈 正式 NMF Reconstruction Error: {nmf_model.reconstruction_err_:.4f}")
return adata_sub, h_gene_df
# =============================================================================
# --- 6. CORE FUNCTION 2: CELL TYPE MAPPING ---
# =============================================================================
def map_ovary_signatures(adata_nmf, config):
print("\n====== 【步驟二】卵巢特徵基因集相似度映射 ======")
cluster_counts = adata_nmf.obs["nmf_raw"].value_counts()
valid_groups = [grp for grp in cluster_counts.index if cluster_counts[grp] > 1]
if not valid_groups:
print("⚠️ 所有 NMF 群組細胞數過少,跳過映射。")
adata_nmf.obs["ovary_cell_type"] = adata_nmf.obs["nmf_raw"].astype(str)
return adata_nmf, pd.DataFrame(), pd.DataFrame()
sc.tl.rank_genes_groups(adata_nmf, "nmf_raw", groups=valid_groups, method="wilcoxon", use_raw=True)
mapping_results = []
evidence_rows = []
cluster_to_celltype = {}
for grp in valid_groups:
top_genes = list(adata_nmf.uns["rank_genes_groups"]["names"][grp][:100])
top_set = set(top_genes)
best_match, best_jaccard = None, -1
for cell_type, sig_genes in DEFAULT_TISSUE_MARKERS.items():
sig_set = set(sig_genes)
inter = top_set.intersection(sig_set)
union = top_set.union(sig_set)
jaccard = len(inter) / len(union) if union else 0
evidence_rows.append({
"NMF_Cluster": grp,
"Ovary_Signature": cell_type,
"Jaccard_Index": round(jaccard, 4),
"Overlap_Count": len(inter),
"Intersection_Genes": ", ".join(sorted(inter)),
})
if jaccard > best_jaccard:
best_jaccard = jaccard
best_match = cell_type
cluster_to_celltype[grp] = best_match
mapping_results.append({
"NMF_Cluster": grp,
"Best_Match_CellType": best_match,
"Jaccard_Index": round(best_jaccard, 4),
})
adata_nmf.obs["ovary_cell_type"] = adata_nmf.obs["nmf_raw"].map(cluster_to_celltype).astype("category")
df_mapping = pd.DataFrame(mapping_results)
df_evidence = pd.DataFrame(evidence_rows)
print(df_mapping.to_string(index=False))
return adata_nmf, df_mapping, df_evidence
# =============================================================================
# --- 7. CORE FUNCTION 3: DOWNSTREAM GSEA & BUBBLE PLOTS ---
# =============================================================================
def get_enrichment(gene_list, tag, config):
try:
clean_genes = [str(g).strip().upper() for g in gene_list if pd.notna(g) and str(g).strip()]
if not clean_genes:
return pd.DataFrame()
enr = gp.enrichr(gene_list=clean_genes, gene_sets=config["gene_sets"], organism="human").results
pval_cutoff = config.get("enrichment_pval_cutoff", 0.05)
res = enr[enr["Adjusted P-value"] < pval_cutoff].head(10).copy()
if not res.empty:
res["Source"] = tag
return res
except Exception as e:
print(f"⚠️ 模組 {tag} 富集分析錯誤: {e}")
return pd.DataFrame()
def plot_bubble_save(df, title, filename, config):
if df is None or df.empty:
print(f"ℹ️ {title} 無顯著通路,跳過繪圖。")
return
df = df.copy()
df["-log10P"] = -np.log10(df["Adjusted P-value"].astype(float) + 1e-10)
df["Gene_Count"] = df["Overlap"].apply(lambda x: int(str(x).split("/")[0]) if "/" in str(x) else int(x))
n_terms = df["Term"].nunique()
fig, ax = plt.subplots(figsize=(12, 2 + n_terms * 0.35))
scatter = ax.scatter(df["Source"], df["Term"], s=df["Gene_Count"] * 15 + 50, c=df["-log10P"], cmap="viridis", alpha=0.8, edgecolors="0.3", linewidth=0.5)
ax.set_title(title, fontsize=14, pad=20, fontweight="bold")
ax.set_xlabel("NMF Module Cluster", fontsize=12)
ax.set_ylabel("KEGG Pathway", fontsize=12)
ax.grid(axis="y", linestyle="--", alpha=0.5)
cbar = plt.colorbar(scatter, ax=ax)
cbar.set_label("-log10(Adjusted P-value)")
sizes = [df["Gene_Count"].min(), df["Gene_Count"].median(), df["Gene_Count"].max()]
size_legend = [plt.scatter([], [], s=s*15+50, c="gray", alpha=0.6, edgecolors="0.3") for s in sizes]
ax.legend(size_legend, [f"{s}" for s in sizes], title="Gene Count", loc="upper left", bbox_to_anchor=(1.15, 1))
plt.tight_layout()
save_plot(config["output_dir"] / filename)
print(f"✅ 富集氣泡圖已儲存: {config['output_dir'] / filename}")
# =============================================================================
# --- 8. EXTENSION: GSEA FOR EACH FACTOR & VISUALIZATION ---
# =============================================================================
def run_factor_gsea_pipeline(h_gene_df, config):
print("\n====== 【擴充步驟二】各 Factor 之 GSEA 分析與進階視覺化 ======")
gsea_out_dir = config["output_dir"] / "GSEA_Factors"
gsea_out_dir.mkdir(parents=True, exist_ok=True)
gene_set = config["gene_sets"][0]
pval_cutoff = config.get("gsea_pval_cutoff", 0.05)
fdr_cutoff = config.get("gsea_fdr_cutoff", 0.25)
for factor in h_gene_df.columns:
print(f"🧬 正在分析 {factor} ...")
ranked_genes = h_gene_df[factor].sort_values(ascending=False)
ranked_genes = ranked_genes[ranked_genes > 0]
if len(ranked_genes) < 15:
print(f"⚠️ {factor} 有效基因過少,跳過 GSEA。")
continue
rnk_df = pd.DataFrame({"gene_name": ranked_genes.index, "score": ranked_genes.values})
try:
pre_res = gp.prerank(
rnk=rnk_df, gene_sets=gene_set, processes=4,
permutation_num=config.get("gsea_permutation_num", 1000),
outdir=None, seed=42, verbose=False,
min_size=config.get("gsea_min_size", 15), max_size=config.get("gsea_max_size", 500)
)
res_df = pre_res.res2d
if res_df is None or res_df.empty:
continue
sig_col = "NOM p-val" if "NOM p-val" in res_df.columns else "pval"
fdr_col = "FDR q-val" if "FDR q-val" in res_df.columns else "fdr"
significant_res = res_df[(res_df[sig_col] < pval_cutoff) & (res_df[fdr_col] < fdr_cutoff)].sort_values("NES", ascending=False)
if significant_res.empty:
print(f"ℹ️ {factor} 未篩選到顯著富集通路。")
continue
safe_write_dataframe(significant_res, gsea_out_dir / f"{factor}_gsea_results")
# Barplot 繪製
plot_df = significant_res.head(10).copy()
plt.figure(figsize=(8, 1 + len(plot_df) * 0.4))
colors = plt.cm.coolwarm_r(np.interp(plot_df[fdr_col], [0, fdr_cutoff], [0, 1]))
plt.barh(plot_df["Term"], plot_df["NES"], color=colors)
plt.title(f"{factor} - Top Enriched Pathways (GSEA)", fontsize=13, fontweight="bold")
plt.xlabel("Normalized Enrichment Score (NES)")
plt.grid(axis="x", linestyle="--", alpha=0.5)
plt.gca().invert_yaxis()
sm = plt.cm.ScalarMappable(cmap="coolwarm_r", norm=plt.Normalize(0, fdr_cutoff))
sm.set_array([])
cbar = plt.colorbar(sm, ax=plt.gca(), shrink=0.6)
cbar.set_label("FDR q-val")
save_plot(gsea_out_dir / f"{factor}_gsea_barplot.png")
# CNetPlot 繪製
plot_chord_cnetplot(plot_df.head(4), factor, gsea_out_dir)
except Exception as e:
print(f"⚠️ {factor} GSEA 執行失敗: {e}")
def plot_chord_cnetplot(top_pathways, factor_name, out_dir):
G = nx.Graph()
pathway_nodes = []
gene_nodes = set()
gene_col = None
for col in ["lead_genes", "Lead_genes", "matched_genes", "Matched_genes"]:
if col in top_pathways.columns:
gene_col = col
break
if gene_col is None:
return
for _, row in top_pathways.iterrows():
pathway = row["Term"]
raw_genes = str(row[gene_col])
for sep in [";", ","]:
if sep in raw_genes:
core_genes = [g.strip() for g in raw_genes.split(sep) if g.strip()]
break
else:
core_genes = [raw_genes.strip()] if raw_genes.strip() else []
core_genes = core_genes[:8]
pathway_nodes.append(pathway)
G.add_node(pathway, node_type="pathway", size=row.get("NES", 1) * 100)
for gene in core_genes:
gene_nodes.add(gene)
G.add_node(gene, node_type="gene", size=30)
G.add_edge(pathway, gene)
if not pathway_nodes or not gene_nodes:
return
gene_nodes = list(gene_nodes)
pos = {}
n_p = len(pathway_nodes)
for i, p_node in enumerate(pathway_nodes):
angle = 2 * np.pi * i / n_p
pos[p_node] = np.array([0.4 * np.cos(angle), 0.4 * np.sin(angle)])
n_g = len(gene_nodes)
for i, g_node in enumerate(gene_nodes):
angle = 2 * np.pi * i / n_g
pos[g_node] = np.array([1.0 * np.cos(angle), 1.0 * np.sin(angle)])
fig, ax = plt.subplots(figsize=(10, 10))
nx.draw_networkx_edges(G, pos, alpha=0.2, edge_color="gray", width=1.2, ax=ax)
nx.draw_networkx_nodes(G, pos, nodelist=gene_nodes, node_color="skyblue", node_size=150, alpha=0.8, ax=ax)
p_sizes = [G.nodes[p]["size"] for p in pathway_nodes]
nx.draw_networkx_nodes(G, pos, nodelist=pathway_nodes, node_color="salmon", node_size=p_sizes, alpha=0.9, ax=ax)
for g, p_coord in pos.items():
if g in gene_nodes:
angle = np.arctan2(p_coord[1], p_coord[0])
deg = np.degrees(angle)
rotation = deg if -90 <= deg <= 90 else deg + 180
ha = "left" if -90 <= deg <= 90 else "right"
ax.text(p_coord[0]*1.08, p_coord[1]*1.08, s=g, fontsize=8, ha=ha, va="center", rotation=rotation,
bbox=dict(facecolor="white", alpha=0.7, edgecolor="none", pad=0.5))
for p in pathway_nodes:
p_coord = pos[p]
ax.text(p_coord[0], p_coord[1] + 0.08, s=p.split("%")[0], fontsize=10, fontweight="bold", ha="center", va="center",
bbox=dict(facecolor="white", alpha=0.8, edgecolor="none", pad=2))
ax.set_title(f"Chord-like Gene-Concept Network (CNetPlot) - {factor_name}", fontsize=14, fontweight="bold")
ax.axis("off")
save_plot(out_dir / f"{factor_name}_gsea_cnetplot.png")
# =============================================================================
# --- 9. EXTENSION: PPI NETWORK & HUB GENES ANALYSIS (COMPLETED) ---
# =============================================================================
def fetch_string_interactions(gene_list, config):
"""
透過 STRING API 獲取基因列表的交互作用關係對。
"""
genes_url = "%0d".join(gene_list)
url = f"https://string-db.org/api/json/network?identifiers={genes_url}&species={config['ppi_species_id']}"
for attempt in range(config["ppi_max_retries"]):
try:
with urllib.request.urlopen(url, timeout=15) as response:
return json.loads(response.read().decode())
except Exception as e:
print(f"⚠️ STRING API 連線失敗 (嘗試 {attempt+1}/{config['ppi_max_retries']}): {e}")
time.sleep(config["ppi_retry_delay"])
return None
def run_ppi_hub_pipeline(h_gene_df, config):
"""
針對每個 Factor 最顯著的 Top 基因建立 PPI 網絡,並篩選 Hub 核心節點。
"""
print("\n====== 【擴充步驟三】STRING 蛋白質交互作用網絡 (PPI) 與 Hub 基因分析 ======")
ppi_out_dir = config["output_dir"] / "PPI_Networks"
ppi_out_dir.mkdir(parents=True, exist_ok=True)
hub_summary = []
for factor in h_gene_df.columns:
top_genes = h_gene_df[factor].nlargest(config["ppi_top_n_genes"]).index.tolist()
print(f"🌐 正在請求 {factor} 的 STRING 網絡數據...")
network_data = fetch_string_interactions(top_genes, config)
if not network_data:
print(f"ℹ️ {factor} 未能取得 PPI 交互作用數據。")
continue
G = nx.Graph()
for edge in network_data:
score = edge.get("score", 0) * 1000 # 標準化分數
if score >= config["ppi_score_cutoff"]:
G.add_edge(edge["preferredName_A"], edge["preferredName_B"], weight=score)
if G.number_of_nodes() == 0:
print(f"ℹ️ {factor} 在當前 score_cutoff 下無顯著交互作用邊。")
continue
# 計算網路拓樸指標:度中心性 (Degree Centrality)
dc = nx.degree_centrality(G)
sorted_hubs = sorted(dc.items(), key=lambda x: x[1], reverse=True)
top_hubs = [node for node, val in sorted_hubs[:5]]
# 紀錄報告
for node, score in sorted_hubs:
hub_summary.append({
"Factor": factor, "Gene": node, "Degree_Centrality": round(score, 4), "Connections": G.degree(node)
})
# 繪圖展示 (最大連通元件)
largest_cc = max(nx.connected_components(G), key=len)
subG = G.subgraph(largest_cc)
plt.figure(figsize=(8, 8))
pos = nx.spring_layout(subG, k=0.4, seed=42)
# 節點大小與度中心性連動
node_sizes = [dc[node] * 1000 + 100 for node in subG.nodes()]
node_colors = ["red" if node in top_hubs else "orange" for node in subG.nodes()]
nx.draw_networkx_edges(subG, pos, alpha=0.3, edge_color="gray")
nx.draw_networkx_nodes(subG, pos, node_size=node_sizes, node_color=node_colors, alpha=0.8)
nx.draw_networkx_labels(subG, pos, font_size=9, font_weight="bold")
plt.title(f"PPI Network Maximum Component - {factor}\n(Red nodes indicate top hubs)", fontsize=12, fontweight="bold")
plt.axis("off")
save_plot(ppi_out_dir / f"{factor}_ppi_network.png")
print(f" ✅ PPI 網絡圖與 Hub 基因已儲存")
if hub_summary:
safe_write_dataframe(pd.DataFrame(hub_summary), config["output_dir"] / "ppi_hub_centrality_report")
# =============================================================================
# --- 10. MAIN EXECUTION ---
# =============================================================================
def main():
CONFIG["output_dir"].mkdir(parents=True, exist_ok=True)
# 1. 主 NMF 矩陣分解與分群評估
adata_nmf, h_gene_df = run_sc_nmf_pipeline(CONFIG)
# 2. 細胞特徵基因集映射
adata_mapped, df_mapping, df_evidence = map_ovary_signatures(adata_nmf, CONFIG)
# 3. 下游 KEGG 富集分析
print("\n====== 【步驟三】下游基因功能富集分析管線 ======")
main_list = []
for col in h_gene_df.columns:
genes = h_gene_df[col].nlargest(CONFIG["n_top_for_enr"]).index.tolist()
enr = get_enrichment(genes, col, CONFIG)
if not enr.empty:
main_list.append(enr)
df_main = pd.concat(main_list, ignore_index=True) if main_list else pd.DataFrame()
print("🎭 正在排除全域 Housekeeping 基因,挖掘次要信號...")
global_w = h_gene_df.sum(axis=1)
actual_exclude_n = min(CONFIG["exclude_top_n"], len(h_gene_df) - 10)
h_masked = h_gene_df.drop(index=global_w.nlargest(actual_exclude_n).index) if actual_exclude_n > 0 else h_gene_df.copy()
minor_list = []
for col in h_masked.columns:
genes = h_masked[col].nlargest(CONFIG["top_genes_n"]).index.tolist()
enr = get_enrichment(genes, f"{col}_Minor", CONFIG)
if not enr.empty:
minor_list.append(enr)
df_minor = pd.concat(minor_list, ignore_index=True) if minor_list else pd.DataFrame()
# 4. 儲存標準報表數據
print("\n====== 【步驟四】儲存分析報告與圖表 ======")
df_cells = pd.DataFrame(index=adata_mapped.obs_names)
df_cells["ovary_cell_type"] = adata_mapped.obs["ovary_cell_type"]
df_cells["nmf_raw"] = adata_mapped.obs["nmf_raw"]
for i in range(CONFIG["nmf_k"]):
col = f"F{i+1}"
df_cells[col] = adata_mapped.obs.get(col, 0.0)
df_cells = df_cells.reset_index().rename(columns={"index": "Cell_ID"})
safe_write_dataframe(df_cells, CONFIG["output_dir"] / "cell_nmf_scores")
if CONFIG.get("enable_transcript_conversion", False):
safe_write_dataframe(expand_gene_weights_to_transcripts(h_gene_df), CONFIG["output_dir"] / "gene_nmf_weights")
else:
safe_write_dataframe(h_gene_df.reset_index().rename(columns={"index": "Gene_Symbol"}), CONFIG["output_dir"] / "gene_nmf_weights")
safe_write_dataframe(df_mapping, CONFIG["output_dir"] / "ovary_mapping")
safe_write_dataframe(df_evidence, CONFIG["output_dir"] / "ovary_mapping_evidence")
if not df_main.empty: safe_write_dataframe(df_main, CONFIG["output_dir"] / "main_kegg_enrichment")
if not df_minor.empty: safe_write_dataframe(df_minor, CONFIG["output_dir"] / "minor_kegg_enrichment")
# 5. UMAP 降維視覺化
if "ovary_cell_type" in adata_mapped.obs.columns:
sc.pl.umap(adata_mapped, color=["ovary_cell_type"], palette="Set2", show=False)
save_plot(CONFIG["output_dir"] / "umap_ovary_annotated.png")
plot_bubble_save(df_main, "Main Pathway Enrichment", "bubble_main_kegg.png", CONFIG)
plot_bubble_save(df_minor, "Hidden Signal Enrichment", "bubble_minor_kegg.png", CONFIG)
# 6. 觸發擴充進階管線 (GSEA 與 PPI)
run_factor_gsea_pipeline(h_gene_df, CONFIG)
run_ppi_hub_pipeline(h_gene_df, CONFIG)
print("\n🎉 【整合與優化成功】所有主管線與擴充進階分析皆已順利執行完畢!")
return adata_mapped, h_gene_df, df_main, df_minor
if __name__ == "__main__":
main()
No comments:
Post a Comment