229 lines
8.4 KiB
Python
229 lines
8.4 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import logging
|
|||
|
|
import asyncio
|
|||
|
|
import time
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# 添加项目根目录到路径,确保可以导入核心模块
|
|||
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|||
|
|
|
|||
|
|
from core.ai_agent import AI_Agent, Timeout
|
|||
|
|
|
|||
|
|
# 配置日志
|
|||
|
|
logging.basicConfig(
|
|||
|
|
level=logging.INFO,
|
|||
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|||
|
|
handlers=[logging.StreamHandler()]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 从环境变量获取API密钥,或使用默认值
|
|||
|
|
API_KEY = os.environ.get("OPENAI_API_KEY", "your_api_key_here")
|
|||
|
|
# 使用API的基础URL
|
|||
|
|
BASE_URL = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1")
|
|||
|
|
# 使用的模型名称
|
|||
|
|
MODEL_NAME = os.environ.get("OPENAI_MODEL", "gpt-3.5-turbo")
|
|||
|
|
|
|||
|
|
def print_with_timestamp(message, end='\n'):
|
|||
|
|
"""打印带有时间戳和线程ID的消息"""
|
|||
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|||
|
|
task_id = asyncio.current_task().get_name() if asyncio.current_task() else "主线程"
|
|||
|
|
print(f"[{timestamp}][{task_id}] {message}", end=end, flush=True)
|
|||
|
|
|
|||
|
|
async def process_stream_task(agent, system_prompt, user_prompt, task_id):
|
|||
|
|
"""处理单个流式生成任务"""
|
|||
|
|
print_with_timestamp(f"任务 {task_id} 开始")
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
full_response = ""
|
|||
|
|
async for chunk in agent.async_generate_text_stream(
|
|||
|
|
system_prompt=system_prompt,
|
|||
|
|
user_prompt=user_prompt
|
|||
|
|
):
|
|||
|
|
full_response += chunk
|
|||
|
|
print_with_timestamp(f"任务 {task_id} 收到块: 「{chunk}」", end="")
|
|||
|
|
|
|||
|
|
elapsed = time.time() - start_time
|
|||
|
|
print_with_timestamp(f"\n任务 {task_id} 完成!耗时: {elapsed:.2f}秒")
|
|||
|
|
return {"task_id": task_id, "response": full_response, "success": True, "elapsed": elapsed}
|
|||
|
|
|
|||
|
|
except Timeout as e:
|
|||
|
|
elapsed = time.time() - start_time
|
|||
|
|
print_with_timestamp(f"任务 {task_id} 超时: {e}")
|
|||
|
|
return {"task_id": task_id, "error": str(e), "success": False, "elapsed": elapsed}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
elapsed = time.time() - start_time
|
|||
|
|
print_with_timestamp(f"任务 {task_id} 异常: {type(e).__name__} - {e}")
|
|||
|
|
return {"task_id": task_id, "error": f"{type(e).__name__}: {str(e)}", "success": False, "elapsed": elapsed}
|
|||
|
|
|
|||
|
|
async def run_concurrent_streams():
|
|||
|
|
"""同时运行多个流式生成任务"""
|
|||
|
|
print_with_timestamp("开始并发流式处理测试...")
|
|||
|
|
|
|||
|
|
# 创建 AI_Agent 实例
|
|||
|
|
agent = AI_Agent(
|
|||
|
|
base_url=BASE_URL,
|
|||
|
|
model_name=MODEL_NAME,
|
|||
|
|
api=API_KEY,
|
|||
|
|
timeout=30, # 请求总超时时间
|
|||
|
|
max_retries=2,
|
|||
|
|
stream_chunk_timeout=10 # 流块超时时间
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 定义不同的任务
|
|||
|
|
tasks = [
|
|||
|
|
{
|
|||
|
|
"id": "城市介绍",
|
|||
|
|
"system": "你是一个专业的旅游指南。",
|
|||
|
|
"user": "请简要介绍北京这座城市的历史和主要景点。"
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "美食推荐",
|
|||
|
|
"system": "你是一个美食专家。",
|
|||
|
|
"user": "推荐5种上海的特色小吃,并简要说明其特点。"
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
"id": "旅行建议",
|
|||
|
|
"system": "你是一个旅行规划顾问。",
|
|||
|
|
"user": "我计划去云南旅行一周,请给我一个简要的行程安排。"
|
|||
|
|
}
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
# 创建并发任务
|
|||
|
|
coroutines = []
|
|||
|
|
for task in tasks:
|
|||
|
|
# 为每个任务设置不同名称,便于日志区分
|
|||
|
|
coro = process_stream_task(
|
|||
|
|
agent=agent,
|
|||
|
|
system_prompt=task["system"],
|
|||
|
|
user_prompt=task["user"],
|
|||
|
|
task_id=task["id"]
|
|||
|
|
)
|
|||
|
|
# 设置任务名称
|
|||
|
|
coroutines.append(asyncio.create_task(coro, name=f"Task-{task['id']}"))
|
|||
|
|
|
|||
|
|
# 等待所有任务完成
|
|||
|
|
results = await asyncio.gather(*coroutines, return_exceptions=True)
|
|||
|
|
|
|||
|
|
# 处理并显示结果
|
|||
|
|
total_elapsed = time.time() - start_time
|
|||
|
|
print_with_timestamp(f"所有任务完成,总耗时: {total_elapsed:.2f}秒")
|
|||
|
|
|
|||
|
|
success_count = sum(1 for r in results if isinstance(r, dict) and r.get("success", False))
|
|||
|
|
error_count = len(results) - success_count
|
|||
|
|
|
|||
|
|
print_with_timestamp(f"成功任务: {success_count}, 失败任务: {error_count}")
|
|||
|
|
|
|||
|
|
# 显示每个任务的详细结果
|
|||
|
|
for i, result in enumerate(results):
|
|||
|
|
if isinstance(result, dict):
|
|||
|
|
task_id = result.get("task_id", f"未知任务-{i}")
|
|||
|
|
if result.get("success", False):
|
|||
|
|
print_with_timestamp(f"任务 {task_id} 成功,耗时: {result.get('elapsed', 0):.2f}秒")
|
|||
|
|
else:
|
|||
|
|
print_with_timestamp(f"任务 {task_id} 失败: {result.get('error', '未知错误')}")
|
|||
|
|
else:
|
|||
|
|
print_with_timestamp(f"任务 {i} 返回了异常: {result}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print_with_timestamp(f"并发处理主程序异常: {type(e).__name__} - {e}")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
# 关闭 AI_Agent
|
|||
|
|
agent.close()
|
|||
|
|
|
|||
|
|
async def run_sequential_vs_concurrent():
|
|||
|
|
"""比较顺序处理和并发处理的性能差异"""
|
|||
|
|
print_with_timestamp("开始顺序处理与并发处理性能对比测试...")
|
|||
|
|
|
|||
|
|
# 创建 AI_Agent 实例
|
|||
|
|
agent = AI_Agent(
|
|||
|
|
base_url=BASE_URL,
|
|||
|
|
model_name=MODEL_NAME,
|
|||
|
|
api=API_KEY,
|
|||
|
|
timeout=30,
|
|||
|
|
max_retries=2,
|
|||
|
|
stream_chunk_timeout=10
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 定义测试任务
|
|||
|
|
tasks = [
|
|||
|
|
{"id": "任务1", "prompt": "列出三个世界著名的旅游景点及其特色。"},
|
|||
|
|
{"id": "任务2", "prompt": "简述三种不同的旅行方式的优缺点。"},
|
|||
|
|
{"id": "任务3", "prompt": "推荐三个适合冬季旅行的目的地。"}
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 顺序处理
|
|||
|
|
print_with_timestamp("开始顺序处理...")
|
|||
|
|
sequential_start = time.time()
|
|||
|
|
|
|||
|
|
for task in tasks:
|
|||
|
|
print_with_timestamp(f"开始处理 {task['id']}...")
|
|||
|
|
task_start = time.time()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
response = ""
|
|||
|
|
async for chunk in agent.async_generate_text_stream(
|
|||
|
|
system_prompt="你是一个旅游顾问。",
|
|||
|
|
user_prompt=task["prompt"]
|
|||
|
|
):
|
|||
|
|
response += chunk
|
|||
|
|
|
|||
|
|
task_elapsed = time.time() - task_start
|
|||
|
|
print_with_timestamp(f"{task['id']} 完成,耗时: {task_elapsed:.2f}秒")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print_with_timestamp(f"{task['id']} 处理失败: {e}")
|
|||
|
|
|
|||
|
|
sequential_elapsed = time.time() - sequential_start
|
|||
|
|
print_with_timestamp(f"顺序处理总耗时: {sequential_elapsed:.2f}秒")
|
|||
|
|
|
|||
|
|
# 并发处理
|
|||
|
|
print_with_timestamp("\n开始并发处理...")
|
|||
|
|
concurrent_start = time.time()
|
|||
|
|
|
|||
|
|
coroutines = []
|
|||
|
|
for task in tasks:
|
|||
|
|
coro = process_stream_task(
|
|||
|
|
agent=agent,
|
|||
|
|
system_prompt="你是一个旅游顾问。",
|
|||
|
|
user_prompt=task["prompt"],
|
|||
|
|
task_id=task["id"]
|
|||
|
|
)
|
|||
|
|
coroutines.append(asyncio.create_task(coro, name=f"Task-{task['id']}"))
|
|||
|
|
|
|||
|
|
await asyncio.gather(*coroutines)
|
|||
|
|
|
|||
|
|
concurrent_elapsed = time.time() - concurrent_start
|
|||
|
|
print_with_timestamp(f"并发处理总耗时: {concurrent_elapsed:.2f}秒")
|
|||
|
|
|
|||
|
|
# 性能对比
|
|||
|
|
speedup = sequential_elapsed / concurrent_elapsed if concurrent_elapsed > 0 else float('inf')
|
|||
|
|
print_with_timestamp(f"\n性能对比:")
|
|||
|
|
print_with_timestamp(f"顺序处理耗时: {sequential_elapsed:.2f}秒")
|
|||
|
|
print_with_timestamp(f"并发处理耗时: {concurrent_elapsed:.2f}秒")
|
|||
|
|
print_with_timestamp(f"加速比: {speedup:.2f}x")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print_with_timestamp(f"对比测试异常: {type(e).__name__} - {e}")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
agent.close()
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 运行并发处理示例
|
|||
|
|
asyncio.run(run_concurrent_streams())
|
|||
|
|
|
|||
|
|
print("\n" + "="*70 + "\n")
|
|||
|
|
|
|||
|
|
# 运行性能对比
|
|||
|
|
asyncio.run(run_sequential_vs_concurrent())
|