bangbang-aigc-server/travel-algorithms/example/document_processing_example.py

338 lines
12 KiB
Python
Raw Permalink Normal View History

2025-07-31 15:35:23 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Document Processing Usage Example
文档处理使用示例 - 展示如何使用重构后的文档处理功能
"""
import asyncio
from pathlib import Path
# 导入算法包
from travel_algorithms import (
create_document_pipeline,
create_default_config,
DocumentProcessor,
TextExtractor,
ContentIntegrator,
ContentTransformer
)
async def simple_document_processing_example():
"""简单的文档处理示例"""
print("📄 文档处理使用示例")
print("=" * 40)
# 1. 创建配置(指向你的资源目录)
config = create_default_config(
resource_base_directory="./resource", # 你的提示词目录
ai_model="qwen-plus" # 可以换成其他模型
)
# 2. 创建文档处理流水线
pipeline = create_document_pipeline(config)
document_processor = pipeline["document_processor"]
print("✅ 文档处理流水线创建成功")
# 3. 创建示例文档
sample_dir = Path("sample_documents")
sample_dir.mkdir(exist_ok=True)
# 创建景区介绍文档
attraction_doc = sample_dir / "attraction_info.txt"
with open(attraction_doc, 'w', encoding='utf-8') as f:
f.write("""
张家界国家森林公园
张家界国家森林公园位于湖南省张家界市是中国第一个国家森林公园也是世界自然遗产地
基本信息
- 面积130平方公里
- 最高峰黄石寨海拔1200米
- 气候亚热带季风气候
- 最佳游览时间春秋两季
主要景点
1. 黄石寨 - "不到黄石寨,枉到张家界"之说
2. 金鞭溪 - 世界上最美丽的峡谷之一
3. 袁家界 - 电影阿凡达拍摄地
4. 天子山 - "峰林之王"的美誉
门票价格
- 成人票2484天有效
- 学生票163
- 老人票16365岁以上
- 儿童1.3米以下免费
交通指南
- 飞机张家界荷花机场距市区5公里
- 火车张家界火车站
- 汽车长沙广州等地有直达班车
住宿推荐
- 景区内天子山袁家界有农家乐
- 市区内各档次酒店齐全
游览贴士
1. 景区较大建议安排2-3天游览
2. 山路较多建议穿舒适的运动鞋
3. 天气多变建议携带雨具
4. 保护环境不要乱扔垃圾
""")
# 创建价格信息CSV
price_csv = sample_dir / "pricing.csv"
with open(price_csv, 'w', encoding='utf-8') as f:
f.write("""项目,价格,说明
门票,248,4天有效期
索道,单程65元,往返118元
观光电梯,单程72元,往返144元
小火车,单程40元,往返80元
导游服务,150/,专业导游
餐饮,人均80-120,农家菜为主
住宿,100-500/,根据档次不同""")
print(f"✅ 示例文档创建完成: {sample_dir}")
# 4. 处理文档并转换为不同格式
# 4a. 转换为景区标准信息格式
print("\n🏞️ 转换为景区标准信息格式...")
attraction_result = await document_processor.process_documents(
sources=sample_dir,
target_format="attraction_standard",
additional_requirements="重点突出景区特色和实用信息"
)
# 保存结果
output_dir = Path("output/document_processing")
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "attraction_standard.txt", 'w', encoding='utf-8') as f:
f.write(attraction_result['final_output'])
print("✅ 景区标准信息已生成并保存")
# 4b. 转换为旅游攻略格式
print("\n🗺️ 转换为旅游攻略格式...")
guide_result = await document_processor.process_documents(
sources=sample_dir,
target_format="travel_guide",
additional_requirements="重点介绍游览路线和实用贴士"
)
with open(output_dir / "travel_guide.txt", 'w', encoding='utf-8') as f:
f.write(guide_result['final_output'])
print("✅ 旅游攻略已生成并保存")
# 4c. 转换为营销文案格式
print("\n📢 转换为营销文案格式...")
marketing_result = await document_processor.process_documents(
sources=sample_dir,
target_format="marketing_copy",
additional_requirements="突出景区独特性和吸引力,激发游客兴趣"
)
with open(output_dir / "marketing_copy.txt", 'w', encoding='utf-8') as f:
f.write(marketing_result['final_output'])
print("✅ 营销文案已生成并保存")
# 4d. 转换为结构化数据格式
print("\n📊 转换为结构化数据格式...")
structured_result = await document_processor.process_documents(
sources=sample_dir,
target_format="structured_data",
additional_requirements="提取所有关键数据,便于系统处理"
)
with open(output_dir / "structured_data.json", 'w', encoding='utf-8') as f:
import json
if structured_result['transformed_content']['structured_data']:
json.dump(
structured_result['transformed_content']['structured_data'],
f,
ensure_ascii=False,
indent=2
)
else:
f.write(structured_result['final_output'])
print("✅ 结构化数据已生成并保存")
# 5. 显示处理统计
print("\n📊 处理统计:")
for format_name, result in [
("景区标准信息", attraction_result),
("旅游攻略", guide_result),
("营销文案", marketing_result),
("结构化数据", structured_result)
]:
print(f" {format_name}:")
print(f" - 处理文档数: {result['processing_summary']['total_documents']}")
print(f" - 输出长度: {len(result['final_output'])} 字符")
print(f" - 质量评分: {result['transformed_content']['quality_score']:.2f}")
print(f"\n📁 所有结果已保存至: {output_dir}")
async def advanced_document_processing_example():
"""高级文档处理示例"""
print("\n🚀 高级文档处理示例")
print("=" * 40)
# 创建配置
config = create_default_config(resource_base_directory="./resource")
pipeline = create_document_pipeline(config)
# 分步骤处理示例
print("\n📝 分步骤处理示例...")
# 步骤1仅提取文档
text_extractor = pipeline["text_extractor"]
sample_file = Path("sample_documents/attraction_info.txt")
if sample_file.exists():
extracted_doc = text_extractor.extract_from_file(sample_file)
print(f"✅ 文档提取完成:")
print(f" - 文件: {extracted_doc.filename}")
print(f" - 内容长度: {len(extracted_doc.content)} 字符")
print(f" - 词数: {extracted_doc.metadata.get('word_count', 0)}")
# 步骤2仅内容整合
content_integrator = pipeline["content_integrator"]
integrated_content = content_integrator.integrate_documents([extracted_doc])
print(f"✅ 内容整合完成:")
print(f" - 关键主题: {integrated_content.key_topics}")
print(f" - 内容摘要长度: {len(integrated_content.content_summary)} 字符")
# 步骤3仅格式转换
content_transformer = pipeline["content_transformer"]
# 尝试多种格式
formats = ['summary', 'blog_post', 'faq']
for fmt in formats:
try:
transformed = await content_transformer.transform_content(
integrated_content=integrated_content,
format_type=fmt
)
print(f"{fmt} 格式转换完成,质量评分: {transformed.quality_score:.2f}")
except Exception as e:
print(f"{fmt} 格式转换失败: {e}")
async def batch_processing_example():
"""批量处理示例"""
print("\n📦 批量处理示例")
print("=" * 40)
config = create_default_config(resource_base_directory="./resource")
pipeline = create_document_pipeline(config)
document_processor = pipeline["document_processor"]
# 创建多个测试目录
test_dirs = []
for i in range(2):
test_dir = Path(f"batch_test_{i+1}")
test_dir.mkdir(exist_ok=True)
# 在每个目录创建测试文件
with open(test_dir / f"info_{i+1}.txt", 'w', encoding='utf-8') as f:
f.write(f"这是批量测试目录 {i+1} 的内容。\n包含一些测试信息用于验证批量处理功能。")
test_dirs.append(test_dir)
try:
# 执行批量处理
batch_results = await document_processor.batch_process_directories(
directories=test_dirs,
target_format='summary'
)
print(f"✅ 批量处理完成,处理了 {len(batch_results)} 个目录")
for dir_name, result in batch_results.items():
if 'error' in result:
print(f"{dir_name}: {result['error']}")
else:
print(f"{dir_name}: 成功处理")
finally:
# 清理测试目录
for test_dir in test_dirs:
for file in test_dir.iterdir():
file.unlink()
test_dir.rmdir()
def show_supported_formats():
"""显示支持的格式"""
print("\n📋 支持的格式")
print("=" * 40)
config = create_default_config()
pipeline = create_document_pipeline(config)
document_processor = pipeline["document_processor"]
formats = document_processor.get_supported_formats()
print("📥 支持的输入格式:")
for ext, desc in formats['input_formats'].items():
print(f" {ext}: {desc}")
print("\n📤 支持的输出格式:")
for fmt, desc in formats['output_formats'].items():
print(f" {fmt}: {desc}")
async def main():
"""主函数"""
print("🚀 文档处理系统使用示例")
print("📖 本示例展示了重构后的文档处理功能")
print("\n包含的新功能:")
print(" ✨ 多格式文档提取和解析")
print(" ✨ 智能内容整合和主题分析")
print(" ✨ AI驱动的多格式转换")
print(" ✨ 质量评分和处理统计")
print(" ✨ 批量处理和流水线管理")
print(" ✨ 灵活的配置和扩展机制")
print("\n" + "=" * 50)
# 显示支持的格式
show_supported_formats()
# 简单处理示例
await simple_document_processing_example()
# 高级处理示例
await advanced_document_processing_example()
# 批量处理示例
await batch_processing_example()
print("\n" + "=" * 50)
print("🎊 示例演示完成!")
print("\n📁 生成的文件:")
print(" - sample_documents/ - 示例文档目录")
print(" - output/document_processing/ - 处理结果")
print("📖 查看生成的文件了解处理效果")
print("\n💡 使用建议:")
print(" 1. 根据实际需求选择合适的输出格式")
print(" 2. 使用自定义提示词优化转换效果")
print(" 3. 利用批量处理功能提高工作效率")
print(" 4. 关注质量评分,调整处理参数")
if __name__ == "__main__":
asyncio.run(main())