Monday, January 05, 2026

Python code of Leiden Communities on Japanese eel ovarian transcriptome

import pandas as pd

import numpy as np

import matplotlib.pyplot as plt

import networkx as nx

import seaborn as sns

import requests

import time

from pathlib import Path

from typing import List, Tuple, Optional, Dict

from gseapy import enrichr

import warnings


warnings.filterwarnings('ignore')


# --- 1. CNA 核心分析類別 ---

class CNA_Dual_Analyzer:

    def __init__(self, species: int = 9606, max_api_genes: int = 200):

        self.species = species

        self.max_api_genes = max_api_genes

        self.api_url = "https://string-db.org/api/json/network"

        

    def _safe_string_api(self, genes: List[str]) -> pd.DataFrame:

        if len(genes) < 2: return pd.DataFrame()

        batch_genes = genes[:self.max_api_genes]

        params = {

            "identifiers": "%0d".join(batch_genes),

            "species": self.species,

            "caller_identity": "cna_v2_1"

        }

        try:

            resp = requests.post(self.api_url, data=params, timeout=30)

            resp.raise_for_status()

            data = resp.json()

            if not data: return pd.DataFrame()

            df = pd.DataFrame(data)

            return df[['preferredName_A', 'preferredName_B', 'score']].dropna()

        except Exception as e:

            print(f"⚠️ STRING API失敗: {e}")

            return pd.DataFrame()


    def compute_cna_metrics(self, cluster_data: pd.DataFrame, at_cols: List[str], mt_cols: List[str]) -> pd.DataFrame:

        metrics = []

        for _, row in cluster_data.iterrows():

            gene = str(row.get('Gene_symbol', row.index[0])).strip().upper()

            if not gene or gene in ['NAN', 'NONE', '']: continue

            mean_at = row[at_cols].mean() if at_cols else 0.0

            mean_mt = row[mt_cols].mean() if mt_cols else 0.0

            asinh_diff = np.arcsinh(mean_mt) - np.arcsinh(mean_at)

            log2fc = np.log2((mean_mt + 1e-6) / (mean_at + 1e-6))

            metrics.append({

                'Gene': gene, 'Mean_AT': float(mean_at), 'Mean_MT': float(mean_mt),

                'asinh_Diff': float(asinh_diff), 'Log2FC': float(log2fc),

                'Expr_Change': 'UP' if asinh_diff > 0 else 'DOWN'

            })

        return pd.DataFrame(metrics)


    def build_control_network(self, genes: List[str], metrics_df: pd.DataFrame) -> Tuple[Optional[nx.Graph], pd.DataFrame]:

        ppi_df = self._safe_string_api(genes)

        if ppi_df.empty: return None, pd.DataFrame()

        G = nx.Graph()

        for _, row in ppi_df.iterrows():

            G.add_edge(row['preferredName_A'], row['preferredName_B'], weight=float(row['score']) / 1000)

        

        centrality = nx.degree_centrality(G)

        results = []

        for node in G.nodes():

            gene_metrics = metrics_df[metrics_df['Gene'] == node]

            asinh_diff = gene_metrics['asinh_Diff'].iloc[0] if not gene_metrics.empty else 0.0

            ctrl_score = centrality.get(node, 0) * (abs(asinh_diff) + 1)

            results.append({

                'Gene': node, 'Control_Score': ctrl_score, 'Centrality': centrality.get(node, 0),

                'asinh_Diff': asinh_diff, 'Log2FC': gene_metrics['Log2FC'].iloc[0] if not gene_metrics.empty else 0.0,

                'Expr_Change': gene_metrics['Expr_Change'].iloc[0] if not gene_metrics.empty else 'NONE'

            })

        stats_df = pd.DataFrame(results).sort_values('Control_Score', ascending=False)

        return G, stats_df


# --- 2. 視覺化類別 (整合 CNetplot) ---

class NetworkVisualizer:

    @staticmethod

    def create_cna_network(G, stats_df, output_path, cluster_id):

        """核心 PPI 控制網絡圖"""

        if G is None or stats_df.empty: return

        plt.figure(figsize=(12, 10), dpi=100)

        pos = nx.spring_layout(G, k=0.3, iterations=50)

        sizes = 100 + stats_df.set_index('Gene').reindex(G.nodes())['Control_Score'] * 2000

        colors = stats_df.set_index('Gene').reindex(G.nodes())['asinh_Diff']

        nx.draw_networkx_nodes(G, pos, node_size=sizes, node_color=colors, cmap='RdYlBu_r', alpha=0.8)

        nx.draw_networkx_edges(G, pos, alpha=0.2)

        top_genes = stats_df.head(20)['Gene'].tolist()

        nx.draw_networkx_labels(G, pos, {n: n for n in G.nodes() if n in top_genes}, font_size=8)

        plt.title(f"CNA Network: Cluster {cluster_id}")

        plt.savefig(output_path, bbox_inches='tight')

        plt.close()


    @staticmethod

    def create_cnet_plot(stats_df: pd.DataFrame, output_path: Path, cluster_id: str):

        """生成 CNetplot (Gene-Pathway Network)"""

        print(f"🧬 正在為 Cluster {cluster_id} 生成 CNetplot...")

        genes = stats_df.head(100)['Gene'].tolist() # 取控制力前100的基因

        try:

            enr = enrichr(gene_list=genes, gene_sets='KEGG_2021_Human', organism='Human', outdir=None)

            res = enr.results.nsmallest(5, 'Adjusted P-value') # 取前5個通路

            

            if res.empty: return


            G = nx.Graph()

            # 建立基因與通路的連接

            gene_colors = dict(zip(stats_df['Gene'], stats_df['asinh_Diff']))

            

            for _, row in res.iterrows():

                pathway = row['Term'].split('__')[-1]

                G.add_node(pathway, type='pathway')

                path_genes = row['Genes'].split(';')

                for g in path_genes:

                    if g in genes:

                        G.add_node(g, type='gene')

                        G.add_edge(pathway, g)


            plt.figure(figsize=(14, 10))

            pos = nx.spring_layout(G, k=0.5)

            

            # 繪製路徑節點 (大橘色)

            path_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'pathway']

            nx.draw_networkx_nodes(G, pos, nodelist=path_nodes, node_color='#FF7F0E', 

                                   node_size=1200, node_shape='p', label='Pathways')

            

            # 繪製基因節點 (顏色代表 asinh_Diff)

            gene_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'gene']

            g_colors = [gene_colors.get(n, 0) for n in gene_nodes]

            g_sizes = [stats_df[stats_df['Gene']==n]['Control_Score'].iloc[0]*800 + 100 for n in gene_nodes]

            

            nodes = nx.draw_networkx_nodes(G, pos, nodelist=gene_nodes, node_color=g_colors, 

                                           node_size=g_sizes, cmap='RdYlBu_r', label='Genes')

            

            nx.draw_networkx_edges(G, pos, alpha=0.3, edge_color='gray')

            nx.draw_networkx_labels(G, pos, font_size=9, font_weight='bold')

            

            plt.colorbar(nodes, label='asinh Difference')

            plt.title(f"CNetplot: Top Pathways & Control Genes (Cluster {cluster_id})")

            plt.legend(scatterpoints=1)

            plt.savefig(output_path, dpi=300, bbox_inches='tight')

            plt.close()

        except Exception as e:

            print(f"⚠️ CNetplot失敗: {e}")


# --- 3. 主程序 ---

def main_pipeline(excel_path: Path):

    output_dir = Path.home() / "Desktop" / "CNA_Analysis_CNet"

    output_dir.mkdir(exist_ok=True)

    

    df = pd.read_excel(excel_path)

    at_cols = [c for c in df.columns if 'AT' in str(c).upper()]

    mt_cols = [c for c in df.columns if 'MT' in str(c).upper()]

    gene_col = next(c for c in ['Gene_symbol', 'GeneID'] if c in df.columns)

    

    analyzer = CNA_Dual_Analyzer()

    visualizer = NetworkVisualizer()

    

    clusters = sorted(df['leiden'].unique().astype(str))

    

    for cid in clusters:

        print(f"\n🚀 處理 Cluster {cid}...")

        cluster_df = df[df['leiden'].astype(str) == cid]

        cluster_dir = output_dir / f"Cluster_{cid}"

        cluster_dir.mkdir(exist_ok=True)

        

        # 1. 計算指標

        metrics_df = analyzer.compute_cna_metrics(cluster_df, at_cols, mt_cols)

        

        # 2. 構建 CNA 網絡

        genes = metrics_df['Gene'].tolist()

        G, stats_df = analyzer.build_control_network(genes, metrics_df)

        

        if not stats_df.empty:

            stats_df.to_excel(cluster_dir / "CNA_Report.xlsx", index=False)

            

            # 3. 繪製 PPI 控制圖

            visualizer.create_cna_network(G, stats_df, cluster_dir / "CNA_Network.png", cid)

            

            # 4. 繪製 CNetplot (路徑-基因交互圖)

            visualizer.create_cnet_plot(stats_df, cluster_dir / "CNetplot.png", cid)

            

        time.sleep(1)


if __name__ == "__main__":

    file_path = Path.home() / "Desktop" / "Aja 21649_LeidCom_clustered.xlsx"

    main_pipeline(file_path)

No comments: