TravelContentCreator/examples/concurrent_stream_processing.py

229 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import logging
import asyncio
import time
from datetime import datetime
# 添加项目根目录到路径,确保可以导入核心模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.ai_agent import AI_Agent, Timeout
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
# 从环境变量获取API密钥或使用默认值
API_KEY = os.environ.get("OPENAI_API_KEY", "your_api_key_here")
# 使用API的基础URL
BASE_URL = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1")
# 使用的模型名称
MODEL_NAME = os.environ.get("OPENAI_MODEL", "gpt-3.5-turbo")
def print_with_timestamp(message, end='\n'):
"""打印带有时间戳和线程ID的消息"""
timestamp = datetime.now().strftime("%H:%M:%S")
task_id = asyncio.current_task().get_name() if asyncio.current_task() else "主线程"
print(f"[{timestamp}][{task_id}] {message}", end=end, flush=True)
async def process_stream_task(agent, system_prompt, user_prompt, task_id):
"""处理单个流式生成任务"""
print_with_timestamp(f"任务 {task_id} 开始")
start_time = time.time()
try:
full_response = ""
async for chunk in agent.async_generate_text_stream(
system_prompt=system_prompt,
user_prompt=user_prompt
):
full_response += chunk
print_with_timestamp(f"任务 {task_id} 收到块: 「{chunk}", end="")
elapsed = time.time() - start_time
print_with_timestamp(f"\n任务 {task_id} 完成!耗时: {elapsed:.2f}")
return {"task_id": task_id, "response": full_response, "success": True, "elapsed": elapsed}
except Timeout as e:
elapsed = time.time() - start_time
print_with_timestamp(f"任务 {task_id} 超时: {e}")
return {"task_id": task_id, "error": str(e), "success": False, "elapsed": elapsed}
except Exception as e:
elapsed = time.time() - start_time
print_with_timestamp(f"任务 {task_id} 异常: {type(e).__name__} - {e}")
return {"task_id": task_id, "error": f"{type(e).__name__}: {str(e)}", "success": False, "elapsed": elapsed}
async def run_concurrent_streams():
"""同时运行多个流式生成任务"""
print_with_timestamp("开始并发流式处理测试...")
# 创建 AI_Agent 实例
agent = AI_Agent(
base_url=BASE_URL,
model_name=MODEL_NAME,
api=API_KEY,
timeout=30, # 请求总超时时间
max_retries=2,
stream_chunk_timeout=10 # 流块超时时间
)
try:
# 定义不同的任务
tasks = [
{
"id": "城市介绍",
"system": "你是一个专业的旅游指南。",
"user": "请简要介绍北京这座城市的历史和主要景点。"
},
{
"id": "美食推荐",
"system": "你是一个美食专家。",
"user": "推荐5种上海的特色小吃并简要说明其特点。"
},
{
"id": "旅行建议",
"system": "你是一个旅行规划顾问。",
"user": "我计划去云南旅行一周,请给我一个简要的行程安排。"
}
]
start_time = time.time()
# 创建并发任务
coroutines = []
for task in tasks:
# 为每个任务设置不同名称,便于日志区分
coro = process_stream_task(
agent=agent,
system_prompt=task["system"],
user_prompt=task["user"],
task_id=task["id"]
)
# 设置任务名称
coroutines.append(asyncio.create_task(coro, name=f"Task-{task['id']}"))
# 等待所有任务完成
results = await asyncio.gather(*coroutines, return_exceptions=True)
# 处理并显示结果
total_elapsed = time.time() - start_time
print_with_timestamp(f"所有任务完成,总耗时: {total_elapsed:.2f}")
success_count = sum(1 for r in results if isinstance(r, dict) and r.get("success", False))
error_count = len(results) - success_count
print_with_timestamp(f"成功任务: {success_count}, 失败任务: {error_count}")
# 显示每个任务的详细结果
for i, result in enumerate(results):
if isinstance(result, dict):
task_id = result.get("task_id", f"未知任务-{i}")
if result.get("success", False):
print_with_timestamp(f"任务 {task_id} 成功,耗时: {result.get('elapsed', 0):.2f}")
else:
print_with_timestamp(f"任务 {task_id} 失败: {result.get('error', '未知错误')}")
else:
print_with_timestamp(f"任务 {i} 返回了异常: {result}")
except Exception as e:
print_with_timestamp(f"并发处理主程序异常: {type(e).__name__} - {e}")
finally:
# 关闭 AI_Agent
agent.close()
async def run_sequential_vs_concurrent():
"""比较顺序处理和并发处理的性能差异"""
print_with_timestamp("开始顺序处理与并发处理性能对比测试...")
# 创建 AI_Agent 实例
agent = AI_Agent(
base_url=BASE_URL,
model_name=MODEL_NAME,
api=API_KEY,
timeout=30,
max_retries=2,
stream_chunk_timeout=10
)
# 定义测试任务
tasks = [
{"id": "任务1", "prompt": "列出三个世界著名的旅游景点及其特色。"},
{"id": "任务2", "prompt": "简述三种不同的旅行方式的优缺点。"},
{"id": "任务3", "prompt": "推荐三个适合冬季旅行的目的地。"}
]
try:
# 顺序处理
print_with_timestamp("开始顺序处理...")
sequential_start = time.time()
for task in tasks:
print_with_timestamp(f"开始处理 {task['id']}...")
task_start = time.time()
try:
response = ""
async for chunk in agent.async_generate_text_stream(
system_prompt="你是一个旅游顾问。",
user_prompt=task["prompt"]
):
response += chunk
task_elapsed = time.time() - task_start
print_with_timestamp(f"{task['id']} 完成,耗时: {task_elapsed:.2f}")
except Exception as e:
print_with_timestamp(f"{task['id']} 处理失败: {e}")
sequential_elapsed = time.time() - sequential_start
print_with_timestamp(f"顺序处理总耗时: {sequential_elapsed:.2f}")
# 并发处理
print_with_timestamp("\n开始并发处理...")
concurrent_start = time.time()
coroutines = []
for task in tasks:
coro = process_stream_task(
agent=agent,
system_prompt="你是一个旅游顾问。",
user_prompt=task["prompt"],
task_id=task["id"]
)
coroutines.append(asyncio.create_task(coro, name=f"Task-{task['id']}"))
await asyncio.gather(*coroutines)
concurrent_elapsed = time.time() - concurrent_start
print_with_timestamp(f"并发处理总耗时: {concurrent_elapsed:.2f}")
# 性能对比
speedup = sequential_elapsed / concurrent_elapsed if concurrent_elapsed > 0 else float('inf')
print_with_timestamp(f"\n性能对比:")
print_with_timestamp(f"顺序处理耗时: {sequential_elapsed:.2f}")
print_with_timestamp(f"并发处理耗时: {concurrent_elapsed:.2f}")
print_with_timestamp(f"加速比: {speedup:.2f}x")
except Exception as e:
print_with_timestamp(f"对比测试异常: {type(e).__name__} - {e}")
finally:
agent.close()
if __name__ == "__main__":
# 运行并发处理示例
asyncio.run(run_concurrent_streams())
print("\n" + "="*70 + "\n")
# 运行性能对比
asyncio.run(run_sequential_vs_concurrent())