
1. 项目概述为什么我们需要自己写异步压测脚本在算法工程的实际落地中我们经常会遇到需要评估服务接口性能的场景。无论是新模型上线前的容量评估还是日常的稳定性巡检压力测试都是不可或缺的一环。市面上有成熟的工具比如JMeter、Locust、甚至是简单的ab命令那为什么我们还要费劲用Python自己写脚本呢这恰恰是算法工程师和普通测试工程师视角的差异所在。对于算法服务尤其是异步接口压测的诉求往往更复杂。一个典型的异步接口流程是客户端发起一个预测请求服务端可能先返回一个task_id然后客户端需要轮询另一个状态查询接口来获取最终结果。这种“请求-轮询”模式用标准压测工具配置起来比较繁琐特别是当我们需要模拟真实业务逻辑比如根据返回的task_id动态构造下一次轮询请求时。自己写Python脚本意味着我们可以将复杂的业务断言、结果校验、甚至是一些简单的后处理逻辑比如计算模型推理的P99延迟都内嵌到压测流程中实现高度定制化的测试场景。更关键的是算法服务的性能指标不仅仅是QPS每秒查询率和响应时间。我们可能关心在特定并发下GPU显存的占用率、批处理Batch的效率、甚至模型预热对首请求延迟的影响。这些细粒度的监控和断言用通用工具很难直接实现而用Python脚本我们可以轻松地集成psutil、GPUtil等库在发起请求的同时采集服务端资源数据或者通过解析日志来验证特定事件。所以这个脚本项目的核心价值在于用代码的灵活性去满足算法服务压测的特殊性和深度需求。2. 核心设计思路构建一个可扩展的异步压测框架直接写一个简单的for循环加requests库也能发起请求但那不是工程化的做法。一个健壮的压测脚本应该像一个微型框架具备清晰的模块划分和扩展性。我的设计通常围绕以下几个核心模块展开这样无论是换接口、改断言还是加监控都只需要改动局部代码。2.1 核心模块拆解一个完整的异步压测脚本框架可以抽象为以下几个部分任务生成器Task Producer负责生成每一次压测请求的“任务描述”。对于异步接口一个任务可能包含两个阶段提交请求和轮询结果。生成器需要能动态构造请求参数并管理任务的状态如初始态、已提交、轮询中、已完成、失败。并发控制器Concurrency Controller这是压力来源的核心。我们不会用简单的多线程因为Python的GIL限制多线程在CPU密集型场景并不高效。对于I/O密集型的HTTP请求我们首选asyncioaiohttp的异步协程方案或者使用concurrent.futures的ThreadPoolExecutor。这里我强烈推荐asyncio它能用单线程管理成千上万个并发连接资源开销极小是压测高并发场景的利器。请求客户端HTTP Client封装具体的HTTP请求逻辑。使用aiohttp.ClientSession来保持连接池复用TCP连接能极大提升效率。这里需要处理好请求超时、重试、异常捕获以及日志记录。结果收集与分析器Result Collector Analyzer每个请求完成后需要记录关键指标状态码、响应时间区分提交阶段和总耗时、是否成功、返回数据等。收集器需要是线程/协程安全的。分析器则定期或在压测结束后计算聚合指标总QPS、成功率、平均/最小/最大/P95/P99延迟并生成可视化报告。监控钩子Monitoring Hooks这是体现定制化能力的地方。我们可以在任务生命周期的不同节点如任务开始、收到提交响应、轮询完成插入钩子函数执行自定义逻辑比如记录当前时间戳到全局队列用于计算吞吐量或者调用另一个接口查询服务端监控指标。2.2 为什么选择 asyncio aiohttp你可能听说过gevent或者multiprocessing。选择asyncio生态主要基于以下几点考量官方原生支持asyncio是Python 3.4的标准库意味着无需额外安装复杂依赖兼容性和未来可维护性更好。高性能I/O对于压测这种“发出大量网络请求并等待”的典型I/O密集型任务异步模型比多线程上下文切换开销更小比多进程资源占用更少能轻松模拟出数千上万的并发用户。清晰的代码结构使用async/await语法代码逻辑看起来像同步代码一样直观避免了回调地狱Callback Hell调试和异常追踪也相对容易。丰富的生态aiohttp是功能极其强大的异步HTTP客户端/服务器库完美匹配asyncio提供了会话、连接池、超时控制等生产级功能。注意asyncio的编程模型与同步代码有根本不同。如果你的压测逻辑中有阻塞式调用如某个库只提供同步接口必须使用loop.run_in_executor将其放到线程池中运行否则会阻塞整个事件循环导致并发数骤降。3. 实操步骤从零搭建压测脚本理论讲完了我们直接上代码。我会以一个模拟的异步图片分类接口为例假设它有两个端点POST /api/v1/async/predict提交图片返回{task_id: xxx, status: pending}GET /api/v1/async/result/{task_id}轮询任务结果成功时返回{status: success, result: {...}}3.1 环境准备与依赖安装首先确保你的Python版本在3.7以上。然后安装核心依赖pip install aiohttp httpx psutil matplotlib pandasaiohttp异步HTTP客户端。httpx一个现代化的同步/异步HTTP客户端这里我们主要用它的同步接口来辅助做一些配置读取或简单的健康检查它的API设计非常友好。psutil用于在压测过程中监控本机资源可选如果你想看压测机是否成为瓶颈。matplotlibpandas用于最终生成漂亮的图表和数据分析报告。项目目录结构可以这样组织async_stress_test/ ├── config.yaml # 配置文件存放URL、并发数、压测时长等 ├── stress_test.py # 主脚本压测框架核心 ├── utils/ │ ├── __init__.py │ ├── logger.py # 日志配置模块 │ └── stats.py # 统计指标收集与计算模块 └── data/ # 存放测试用的图片或参数化数据3.2 核心代码实现解析我们聚焦于stress_test.py的核心部分。第一步定义配置和任务模型我们使用一个dataclass来定义压测任务清晰且易于管理。import asyncio import aiohttp import time from dataclasses import dataclass, field from typing import Optional, Dict, Any import uuid from utils.logger import setup_logger from utils.stats import StatsCollector logger setup_logger(__name__) dataclass class AsyncTask: 一个异步压测任务的数据结构 task_id: str field(default_factorylambda: str(uuid.uuid4())) submit_start_time: float 0.0 # 提交请求开始时间 submit_end_time: float 0.0 # 收到提交响应时间 poll_start_time: float 0.0 # 开始轮询时间 poll_end_time: float 0.0 # 收到最终结果时间 status: str pending # pending, submitted, polling, success, failed response_data: Optional[Dict[str, Any]] None error: Optional[str] None property def submission_latency(self): return (self.submit_end_time - self.submit_start_time) * 1000 if self.submit_end_time else None property def total_latency(self): if self.poll_end_time and self.submit_start_time: return (self.poll_end_time - self.submit_start_time) * 1000 return None第二步实现异步请求工作者这是最核心的部分一个协程函数负责处理一个任务的生命周期。class AsyncStressTester: def __init__(self, base_url: str, concurrency: int, total_requests: int, poll_interval: float 0.1, poll_timeout: float 30.0): self.base_url base_url.rstrip(/) self.concurrency concurrency # 并发协程数 self.semaphore asyncio.Semaphore(concurrency) # 控制并发度 self.total_requests total_requests self.poll_interval poll_interval self.poll_timeout poll_timeout self.stats StatsCollector() self.tasks_queue asyncio.Queue() # 用于生产-消费模型 self.session: Optional[aiohttp.ClientSession] None async def _submit_task(self, session: aiohttp.ClientSession, task: AsyncTask) - bool: 提交异步预测请求 task.submit_start_time time.time() url f{self.base_url}/api/v1/async/predict # 模拟图片数据实际可以从文件读取或参数化生成 fake_image_data bfake_image_binary_data form_data aiohttp.FormData() form_data.add_field(image, fake_image_data, filenametest.jpg, content_typeimage/jpeg) try: async with session.post(url, dataform_data, timeoutaiohttp.ClientTimeout(total10)) as response: task.submit_end_time time.time() if response.status 202: # 异步处理通常返回202 Accepted result await response.json() task_id result.get(task_id) if task_id: task.task_id task_id task.status submitted task.response_data result logger.debug(fTask {task.task_id} submitted successfully.) return True else: task.error Response missing task_id return False else: task.error fSubmit failed with status: {response.status} return False except asyncio.TimeoutError: task.error Submit request timeout return False except Exception as e: task.error fSubmit exception: {str(e)} return False async def _poll_task_result(self, session: aiohttp.ClientSession, task: AsyncTask) - bool: 轮询任务结果直到成功、超时或失败 task.poll_start_time time.time() url f{self.base_url}/api/v1/async/result/{task.task_id} start_poll_time time.time() while (time.time() - start_poll_time) self.poll_timeout: try: async with session.get(url, timeoutaiohttp.ClientTimeout(total2)) as response: if response.status 200: result await response.json() status result.get(status) if status success: task.poll_end_time time.time() task.status success task.response_data result logger.debug(fTask {task.task_id} polled successfully.) return True elif status in [pending, processing]: await asyncio.sleep(self.poll_interval) # 等待后继续轮询 continue else: # failed or other status task.error fTask ended with status: {status} task.status failed return False else: task.error fPoll request failed with status: {response.status} task.status failed return False except asyncio.TimeoutError: # 单次轮询超时可能网络抖动继续尝试 continue except Exception as e: task.error fPoll exception: {str(e)} task.status failed return False # 轮询超时 task.error fPolling timeout after {self.poll_timeout}s task.status failed return False async def _worker(self, session: aiohttp.ClientSession, worker_id: int): 单个压测工作协程从队列中消费任务并执行 logger.info(fWorker {worker_id} started.) while True: try: task await self.tasks_queue.get() if task is None: # 收到终止信号 self.tasks_queue.task_done() break async with self.semaphore: # 控制全局并发避免瞬间创建过多连接 # 1. 提交任务 submit_ok await self._submit_task(session, task) if not submit_ok: self.stats.record_failure(task.submission_latency) self.tasks_queue.task_done() continue # 2. 轮询结果 poll_ok await self._poll_task_result(session, task) if poll_ok: self.stats.record_success(task.total_latency, task.submission_latency) else: self.stats.record_failure(task.total_latency) self.tasks_queue.task_done() except Exception as e: logger.error(fWorker {worker_id} encountered an error: {e}, exc_infoTrue) self.tasks_queue.task_done() logger.info(fWorker {worker_id} finished.) async def _producer(self): 生产压测任务放入队列 for i in range(self.total_requests): task AsyncTask() await self.tasks_queue.put(task) # 可以在这里控制任务投放速率例如每秒投放100个 # await asyncio.sleep(0.01) # 放入与worker数量相同的None作为结束信号 for _ in range(self.concurrency): await self.tasks_queue.put(None) async def run(self): 启动压测 logger.info(fStarting stress test with concurrency{self.concurrency}, total_requests{self.total_requests}) start_time time.time() connector aiohttp.TCPConnector(limit0, limit_per_host0) # 取消连接数限制由semaphore控制 timeout aiohttp.ClientTimeout(totalNone) # 会话级超时设为None由每个请求单独控制 async with aiohttp.ClientSession(connectorconnector, timeouttimeout) as session: self.session session # 启动消费者worker workers [asyncio.create_task(self._worker(session, i)) for i in range(self.concurrency)] # 启动生产者 producer_task asyncio.create_task(self._producer()) # 等待所有任务完成 await self.tasks_queue.join() # 等待worker结束 for w in workers: await w await producer_task total_duration time.time() - start_time logger.info(fStress test finished in {total_duration:.2f} seconds.) self.stats.print_summary(total_duration) # 可以在这里调用 self.stats.plot_latency_distribution() 生成图表第三步主函数与配置import yaml def load_config(config_path: str) - Dict: with open(config_path, r) as f: return yaml.safe_load(f) async def main(): config load_config(config.yaml) base_url config[base_url] concurrency config.get(concurrency, 50) total_requests config.get(total_requests, 1000) poll_interval config.get(poll_interval, 0.1) poll_timeout config.get(poll_timeout, 30.0) tester AsyncStressTester(base_url, concurrency, total_requests, poll_interval, poll_timeout) await tester.run() if __name__ __main__: asyncio.run(main())对应的config.yaml示例base_url: http://your-algorithm-service:8080 concurrency: 100 # 并发协程数 total_requests: 5000 # 总请求数 poll_interval: 0.05 # 轮询间隔(秒)根据业务处理速度调整 poll_timeout: 60.0 # 单个任务总轮询超时时间(秒) ramp_up_time: 10 # 压力爬坡时间(秒)可让worker逐步启动3.3 统计模块与结果可视化utils/stats.py是实现专业压测报告的关键。我们需要一个线程/协程安全的收集器。import threading import time import numpy as np from collections import defaultdict from typing import List import matplotlib.pyplot as plt class StatsCollector: def __init__(self): self._lock threading.Lock() self.success_latencies [] # 成功请求的总耗时列表 self.submit_latencies [] # 提交阶段耗时列表 self.failure_latencies [] # 失败请求的耗时列表如果有 self.status_codes defaultdict(int) self.start_time time.time() self.request_count 0 self.success_count 0 self.failure_count 0 def record_success(self, total_latency_ms: float, submit_latency_ms: float None): with self._lock: self.success_latencies.append(total_latency_ms) if submit_latency_ms: self.submit_latencies.append(submit_latency_ms) self.success_count 1 self.request_count 1 def record_failure(self, latency_ms: float None): with self._lock: if latency_ms: self.failure_latencies.append(latency_ms) self.failure_count 1 self.request_count 1 def _percentile(self, data: List[float], p: float) - float: if not data: return 0.0 return np.percentile(data, p) def print_summary(self, total_duration: float): with self._lock: print(\n *60) print(压力测试结果摘要) print(*60) print(f总耗时: {total_duration:.2f} 秒) print(f总请求数: {self.request_count}) print(f成功数: {self.success_count}) print(f失败数: {self.failure_count}) print(f成功率: {(self.success_count/self.request_count*100 if self.request_count else 0):.2f}%) print(f整体QPS: {self.request_count/total_duration:.2f}) print(f成功请求QPS: {self.success_count/total_duration:.2f}) if self.success_latencies: latencies np.array(self.success_latencies) print(f\n总延迟统计 (单位: ms):) print(f 平均: {np.mean(latencies):.2f}) print(f 中位数: {np.median(latencies):.2f}) print(f 最小值: {np.min(latencies):.2f}) print(f 最大值: {np.max(latencies):.2f}) print(f P90: {self._percentile(self.success_latencies, 90):.2f}) print(f P95: {self._percentile(self.success_latencies, 95):.2f}) print(f P99: {self._percentile(self.success_latencies, 99):.2f}) if self.submit_latencies: submit_lats np.array(self.submit_latencies) print(f\n提交阶段延迟统计 (单位: ms):) print(f 平均: {np.mean(submit_lats):.2f}) print(f P99: {self._percentile(self.submit_latencies, 99):.2f}) print(*60) def plot_latency_distribution(self, save_path: str latency_dist.png): 绘制延迟分布直方图 if not self.success_latencies: print(No success data to plot.) return plt.figure(figsize(12, 5)) plt.subplot(1, 2, 1) plt.hist(self.success_latencies, bins50, edgecolorblack, alpha0.7) plt.title(Total Latency Distribution (Success Requests)) plt.xlabel(Latency (ms)) plt.ylabel(Frequency) plt.grid(True, alpha0.3) if self.submit_latencies: plt.subplot(1, 2, 2) plt.hist(self.submit_latencies, bins50, edgecolorblack, alpha0.7, colororange) plt.title(Submission Latency Distribution) plt.xlabel(Latency (ms)) plt.ylabel(Frequency) plt.grid(True, alpha0.3) plt.tight_layout() plt.savefig(save_path, dpi150) print(fLatency distribution chart saved to {save_path})4. 高级技巧与避坑指南在实际压测中你会遇到各种各样的问题。下面是我踩过坑后总结的一些关键点。4.1 连接池与超时设置的艺术这是影响压测稳定性和资源消耗的核心。连接池限制aiohttp.TCPConnector(limit0, limit_per_host0)将连接数限制设置为0意味着不限制。这听起来很危险但在压测场景下我们通常希望由自己的信号量Semaphore来控制并发而不是连接池。但务必注意如果你的压测机资源如文件描述符数量有限或者目标服务器有连接数限制你需要合理设置limit和limit_per_host避免出现“Too many open files”错误。超时策略务必设置合理的超时。我推荐三个层级的超时会话级超时aiohttp.ClientTimeout(totalNone)。设为None不在这一层做限制。请求级超时在每次session.post/get时通过timeoutaiohttp.ClientTimeout(totalX)设置。对于提交请求可以设长一些如10秒对于轮询请求应该设短一些如2秒避免单次轮询卡住。任务级超时我们在_poll_task_result方法中实现的poll_timeout如30秒这是整个异步任务从提交到拿到结果的最大等待时间。DNS缓存长时间压测时DNS解析可能成为瓶颈。可以考虑使用aiohttp.TCPConnector(use_dns_cacheTrue, ttl_dns_cache300)来启用并设置DNS缓存。4.2 压力曲线与爬坡策略直接全并发猛打可能瞬间把服务打挂也无法观察服务在压力逐步上升时的表现。一个更专业的做法是实现压力爬坡Ramp-up。修改_producer和run方法实现逐步增加并发workerasync def run(self): # ... 前面的初始化代码 ... async with aiohttp.ClientSession(...) as session: self.session session # 逐步启动worker实现压力爬坡 workers [] for i in range(self.concurrency): worker asyncio.create_task(self._worker(session, i)) workers.append(worker) if i self.concurrency - 1: # 最后一个worker不等待 await asyncio.sleep(self.ramp_up_time / max(self.concurrency-1, 1)) # ... 其余代码不变 ...同时在_producer中也可以控制任务投放速率模拟不同的请求到达模型如恒定速率、泊松分布。4.3 结果校验与业务断言压测不只是“把请求发出去”还要验证返回结果的正确性。在_poll_task_result方法中收到成功响应后应添加业务逻辑断言。if status success: # 基础校验 if result not in result: task.error Missing result field in response task.status failed return False # 业务逻辑校验例如分类结果的置信度应大于阈值 predictions result[result].get(predictions, []) if predictions: top_pred max(predictions, keylambda x: x[confidence]) if top_pred[confidence] 0.5: # 假设阈值是0.5 task.error fTop confidence {top_pred[confidence]} too low task.status failed # 或者可以定义为‘degraded’ return False task.poll_end_time time.time() task.status success return True4.4 资源监控与瓶颈定位压测时需要同时监控压测机和服务器的资源以判断瓶颈在哪里。压测机监控在脚本中集成psutil定期打印CPU、内存、网络IO。import psutil def monitor_local_resources(interval5): while monitoring: cpu_percent psutil.cpu_percent(interval1) memory psutil.virtual_memory() net_io psutil.net_io_counters() logger.info(fLocal - CPU: {cpu_percent}%, Mem: {memory.percent}%, Net Sent: {net_io.bytes_sent/1024/1024:.2f}MB, Recv: {net_io.bytes_recv/1024/1024:.2f}MB) time.sleep(interval)服务端监控如果服务端有监控接口如/metrics可以在压测间隙或通过单独的协程去拉取并将数据与压测时间线对齐分析QPS上升时服务端的CPU、GPU、内存、队列长度如何变化。4.5 常见问题与排查清单在实际操作中你大概率会遇到下面这些问题问题现象可能原因排查思路与解决方案QPS上不去压测机CPU很低1. 并发数(concurrency)设置过低。2. 目标服务器响应极快请求间隔成为瓶颈。3.asyncio事件循环被阻塞。1. 逐步增加concurrency观察QPS变化。2. 去掉asyncio.sleep或减小间隔使用更激进的任务投放。3. 检查代码中是否有同步阻塞调用如requests.get,time.sleep全部替换为异步版本(aiohttp,asyncio.sleep)。使用loop.set_debug(True)辅助调试。压测中途出现大量TimeoutError或连接错误1. 服务端过载无法处理。2. 压测机端口耗尽或连接数超限。3. 网络或中间件如负载均衡、Nginx有连接数限制。1. 观察服务端监控确认是否达到性能极限。2. 检查压测机ulimit -n增大文件描述符限制。调整TCPConnector的limit参数。3. 检查Nginx的worker_connections、负载均衡器的配置。压测时可能需要分散到多个压测机。成功率突然下降但服务监控正常1. 业务逻辑校验失败如断言不通过。2. 测试数据有问题触发了服务端异常分支。3. 依赖的下游服务出现波动。1. 查看失败任务的error信息定位是哪种校验失败。2. 检查测试数据集的多样性和边界情况。3. 检查服务端日志看是否有非5xx的业务错误。同时监控下游服务。延迟的P99/P999明显高于平均值1. 服务端存在长尾请求如某些复杂样本处理慢。2. 垃圾回收GC停顿。3. 资源竞争如GPU锁、数据库锁。1. 分析高延迟请求对应的输入数据是否有共性。2. 在服务端开启GC日志分析。3. 检查服务端在高压下的资源竞争指标。压测时应关注分位延迟而不仅是平均值。aiohttp报ClientConnectorError1. DNS解析失败。2. 目标服务器端口未监听或网络不通。3. 本地临时端口耗尽。1. 检查网络连通性尝试使用IP直接访问。2. 确认服务是否健康。3. 优化连接复用减少TCP短连接。压测后确保正确关闭ClientSession。5. 脚本的扩展与集成这个基础框架可以根据需要无限扩展参数化数据源将_submit_task中的fake_image_data替换为从文件、数据库或生成器读取的真实测试数据集。分布式压测使用redis或rabbitmq作为任务队列启动多个压测进程/机器共同消费任务实现分布式压测。每个节点运行独立的脚本但共享一个中心化的结果收集服务如写入InfluxDB。与CI/CD集成将脚本封装成命令行工具接受参数。在流水线中在新版本部署后自动触发一轮基准测试Baseline Test与历史性能数据对比如果核心指标如P99延迟退化超过阈值则自动标记发布失败。生成HTML报告使用Jinja2模板将StatsCollector输出的数据渲染成包含图表、表格的详细HTML报告更便于分享和存档。最后分享一个我个人的深刻体会压测脚本的稳定性本身需要被测试。在正式对线上或预发环境进行压测前务必先在一个隔离的测试环境进行充分验证包括脚本的异常处理、资源清理确保没有连接泄漏、以及结果准确性。我曾经因为一个脚本的BUG误将大量错误格式的请求发送到服务导致日志系统被打满这个教训让我养成了“压测脚本先行自测”的习惯。好的压测脚本本身就应该是一个可靠、可观测、可复现的工程产品。