
pandas/NumPy/SciPy 大数据量处理优化一、pandas 处理千万行数据时的性能问题pandas 处理百万行以内的数据通常不会遇到性能问题。但数据量超过千万行时情况会发生变化一个简单的groupby transform可能从 0.5 秒变成 30 秒一次merge操作会让内存占用超过 32GBapply函数执行时间会很长。很多团队没有 Spark 集群也不想为了分析任务搭建 ClickHouse。他们只有一台 64GB 内存的 Linux 服务器和 Python。在这种情况下优化 pandas 性能是一个实际的工程问题。本文介绍 pandas/NumPy/SciPy 在千万级数据上的性能优化方法涵盖数据类型优化、向量化计算和内存管理。二、性能瓶颈定位优化前先定位瓶颈。pandas 的性能问题通常来自三个方面数据类型占用过大、循环计算、内存拷贝。诊断方法flowchart TD A[性能瓶颈现象] -- B{瓶颈类型判断} B --|CPU 100%| C[计算瓶颈] B --|内存飙升| D[内存瓶颈] B --|两者兼有| E[混合瓶颈] C -- C1[检查: 是否用了 apply/iterrows?] C1 --|是| C2[向量化替代方案] C1 --|否| C3[检查: groupby 策略] D -- D1[检查: 数据类型是否为 object?] D1 --|是| D2[转为 category/数值类型] D1 --|否| D3[检查: 是否存在隐式拷贝] E -- E1[先解决内存瓶颈br/再优化计算] C2 -- F[优化后基准测试] C3 -- F D2 -- F D3 -- F style A fill:#fdd,stroke:#333 style F fill:#dfd,stroke:#333 style E1 fill:#ff9,stroke:#333区分计算瓶颈和内存瓶颈最简单的方法是看系统监控。CPU 占用高而内存稳定是计算瓶颈内存持续增长而 CPU 不高是内存瓶颈两者都高通常是内存瓶颈引发了频繁 GC需要先解决内存问题。三、优化代码示例3.1 数据类型优化import pandas as pd import numpy as np def optimize_dtypes(df: pd.DataFrame) - pd.DataFrame: 数据类型优化将占用内存较大的 object 和 float64 类型 转换为更紧凑的类型。 start_mem df.memory_usage(deepTrue).sum() / 1024**2 # MB for col in df.columns: col_type df[col].dtype # 跳过已经是最优类型的列 if col_type category or pd.api.types.is_datetime64_any_dtype(col_type): continue # object 类型 → category低基数或保持不变高基数 if col_type object: unique_ratio df[col].nunique() / len(df) # 唯一值占比低于 50% 时category 更省内存 if unique_ratio 0.5: df[col] df[col].astype(category) continue # 数值类型降级int64 → int32/int16float64 → float32 if pd.api.types.is_numeric_dtype(col_type): c_min df[col].min() c_max df[col].max() if pd.api.types.is_integer_dtype(col_type): # 根据数值范围选择最小整数类型 if c_min 0: if c_max 255: df[col] df[col].astype(np.uint8) elif c_max 65535: df[col] df[col].astype(np.uint16) elif c_max 4294967295: df[col] df[col].astype(np.uint32) else: if c_min -128 and c_max 127: df[col] df[col].astype(np.int8) elif c_min -32768 and c_max 32767: df[col] df[col].astype(np.int16) elif c_min -2147483648 and c_max 2147483647: df[col] df[col].astype(np.int32) else: # float64 → float32精度损失通常可接受 df[col] df[col].astype(np.float32) end_mem df.memory_usage(deepTrue).sum() / 1024**2 print(f内存优化: {start_mem:.1f}MB → {end_mem:.1f}MB f(节省 {(start_mem - end_mem) / start_mem:.1%})) return df3.2 向量化计算# 反例用 apply 做条件计算慢 # df[user_level] df[score].apply( # lambda x: S if x 90 else A if x 70 else B if x 50 else C # ) # 正例用 NumPy 向量化替代快 10-50 倍 def vectorized_level_mapping(score: pd.Series) - pd.Series: 用 np.select 实现多条件映射避免逐行 apply。 np.select 接受条件列表和结果列表一次性完成所有映射。 conditions [ score 90, score 70, score 50, ] choices [S, A, B] # 不满足任何条件时返回默认值 return pd.Series( np.select(conditions, choices, defaultC), indexscore.index, nameuser_level ) # 复杂场景分组排名的向量化 def vectorized_group_rank( df: pd.DataFrame, group_col: str, value_col: str, ascending: bool False ) - pd.Series: 分组内排名的向量化实现。 常见场景按部门对员工绩效排名、按品类对商品销量排名。 避免使用 df.groupby().apply(lambda x: x.rank()) 改用排序 累计计数的方式性能提升 5-10 倍。 # 先按分组键和值排序 sorted_idx df.sort_values( [group_col, value_col], ascending[True, ascending] ).index # 在每个分组内计算排名等价于组内排序位置 groups df.loc[sorted_idx, group_col] # 组内累计计数 排名 ranks groups.groupby(groups, sortFalse).cumcount() 1 # 按原始索引返回结果 result pd.Series(indexsorted_idx, dataranks.values) return result.reindex(df.index)3.3 内存管理分块处理def chunked_groupby( filepath: str, group_cols: list[str], agg_dict: dict, chunksize: int 500_000 ) - pd.DataFrame: 分块读取 分组聚合处理超过内存容量的 CSV 文件。 核心思路每次只读取 chunksize 行做部分聚合 最后合并所有部分结果再做最终聚合。 agg_dict 示例: {revenue: sum, user_id: nunique} partial_results [] for chunk in pd.read_csv(filepath, chunksizechunksize): # 先对每个 chunk 做部分聚合大幅减少数据量 partial chunk.groupby(group_cols).agg(agg_dict) partial_results.append(partial) # 合并所有部分结果 combined pd.concat(partial_results) # 对合并后的结果做最终聚合 # 注意sum 可以直接再 sumnunique 需要重新计算 final_agg {} for col, func in agg_dict.items(): if func sum: final_agg[col] sum elif func mean: # 均值需要用总和/总计数重新计算 final_agg[col] sum elif func nunique: # nunique 无法从部分结果直接合并需要保留明细 # 这里退化为 count实际场景需要更复杂的处理 final_agg[col] count result combined.groupby(group_cols).agg(final_agg) return result四、优化代价与适用边界性能优化有代价数据类型降级的精度风险。float64 降级为 float32精度从 15-16 位有效数字降到 6-7 位。对于金额计算这个精度损失可能导致分级别差异。例如 1234567.89 在 float32 中会被近似为 1234567.75差了 0.14 元。如果业务对金额精度敏感float 列不能降级只能优化整数列和字符串列。向量化的可读性代价。np.select 和 np.where 的嵌套写法比 apply lambda 难理解。团队中如果有不熟悉 NumPy 的成员维护成本会上升。建议在关键路径上用向量化在非关键路径上保留 apply并在代码注释中写明向量化版本的等价逻辑。分块处理的聚合限制。分块 groupby 对 sum、count、min、max 这类可分解聚合函数支持较好但对 nunique、median、percentile 这类不可分解聚合函数支持有限。nunique 需要看到全量数据才能去重分块处理只能退化为近似算法如 HyperLogLog。适用边界数据量在 1000 万-1 亿行、内存 64GB 以下的单机场景。超过 1 亿行建议考虑 Polars惰性求值 多线程或 DuckDB列式存储 SQL 接口。低于 100 万行pandas 默认配置就够用优化反而增加代码复杂度。五、建议pandas/NumPy/SciPy 的性能优化建议按以下步骤进行加基准测试。在优化前后各跑一遍计时用数据证明优化效果避免感觉变快了的主观判断。优先做数据类型优化。这是投入产出比最高的优化改动小、风险低、收益大。识别性能热点。用%prun或cProfile定位耗时最长的函数只优化热点路径。评估是否需要换工具。如果优化后仍然无法满足性能要求考虑 Polars 或 DuckDB它们的底层架构适合大数据量场景。