1718 lines
47 KiB
Markdown
1718 lines
47 KiB
Markdown
# 产品分类模块架构设计
|
||
|
||
## 1 模块概述
|
||
|
||
### 1.1 功能描述
|
||
|
||
**功能**:读取产品列表Excel文件,使用通义千问LLM对每个产品进行自动分类。
|
||
|
||
**输入**:
|
||
- 产品列表文件路径(Excel格式):包含产品编号、产品名称、景区名称等字段
|
||
- 产品类别配置文件路径(JSONL格式):包含类别、类型、子类型的分类体系
|
||
|
||
**输出**:
|
||
- 分类结果文件(JSONL格式):包含产品编号及其对应的分类结果(类别、类型、子类型)
|
||
- 分类结果文件(CSV格式,可选):包含原始产品数据及分类结果
|
||
|
||
### 1.2 main.py接口
|
||
|
||
```python
|
||
def main(config: AppConfig) -> None:
|
||
"""
|
||
产品分类主函数
|
||
|
||
Args:
|
||
config: 应用配置对象,包含以下内容:
|
||
- files.input_file: 产品列表Excel文件路径
|
||
- files.category_file: 产品类别配置JSONL文件路径
|
||
- files.output_file: 分类结果JSONL输出文件路径
|
||
- files.csv_output_file: 分类结果CSV输出文件路径(可选)
|
||
- api.api_key: 通义千问API Key,若为None则从环境变量读取
|
||
- api.base_url: 通义千问API的base_url
|
||
- api.model: 使用的模型名称
|
||
- api.mode: 调用模式(batch/direct)
|
||
|
||
Returns:
|
||
None
|
||
|
||
Raises:
|
||
FileNotFoundError: 输入文件不存在
|
||
ValueError: 文件格式错误或API调用失败
|
||
"""
|
||
pass
|
||
```
|
||
|
||
### 1.3 调用方式
|
||
|
||
命令行调用示例:
|
||
|
||
```bash
|
||
# 使用配置文件
|
||
python -m src.main --config config/config.yaml
|
||
|
||
# 指定调用模式
|
||
python -m src.main --mode batch # Batch批量模式
|
||
python -m src.main --mode direct # 直接请求模式
|
||
|
||
# 指定CSV输出
|
||
python -m src.main --csv-output-file output/result.csv
|
||
```
|
||
|
||
## 2 架构设计
|
||
|
||
### 2.1 项目结构
|
||
|
||
```
|
||
src/
|
||
├── core/ # 核心实现层
|
||
│ ├── __init__.py
|
||
│ ├── file_handler.py # 文件处理实现
|
||
│ ├── batch_client.py # Batch模式API客户端
|
||
│ ├── direct_client.py # Direct模式API客户端
|
||
│ ├── client_factory.py # 客户端工厂
|
||
│ └── prompt_builder.py # Prompt构建实现
|
||
├── models.py # 数据模型定义
|
||
├── interfaces.py # 抽象接口定义
|
||
├── config.py # 配置管理
|
||
└── main.py # 主程序入口
|
||
```
|
||
|
||
### 2.2 模块职责划分
|
||
|
||
#### 2.2.1 core/file_handler.py
|
||
|
||
文件处理的统一接口和各个专门的处理类:
|
||
|
||
- **FileHandler** - 文件处理统一接口(组合类)
|
||
- **ProductReader** - 产品列表读取实现
|
||
- **CategoryReader** - 类别配置读取实现
|
||
- **BatchFileWriter** - Batch请求文件写入实现
|
||
- **ResultParser** - 结果解析实现
|
||
- **ResultWriter** - 结果写入实现(JSONL)
|
||
- **ResultMerger** - 结果合并实现
|
||
- **CSVWriter** - CSV文件写入实现
|
||
|
||
FileHandler通过组合方式,内部持有上述各专门处理类的实例,对外提供统一接口。
|
||
|
||
#### 2.2.2 core/batch_client.py与core/direct_client.py
|
||
|
||
负责与通义千问API交互,支持两种调用模式:
|
||
|
||
**BatchClient(Batch批量模式)**:
|
||
- 上传请求文件
|
||
- 创建Batch任务
|
||
- 轮询任务状态
|
||
- 下载结果文件
|
||
- 处理API错误
|
||
|
||
**DirectClient(直接请求模式)**:
|
||
- 循环调用API进行实时分类
|
||
- 内置重试机制
|
||
- 错误处理
|
||
|
||
#### 2.2.3 core/client_factory.py
|
||
|
||
客户端工厂类,负责根据配置创建对应的API客户端:
|
||
- 支持Batch和Direct两种模式
|
||
- 统一的创建接口
|
||
- 根据配置自动选择合适的客户端
|
||
|
||
#### 2.2.4 core/prompt_builder.py
|
||
|
||
负责构建LLM的prompt:
|
||
- 根据类别配置生成分类指引
|
||
- 为每个产品构建分类请求prompt
|
||
- 定义输出格式要求
|
||
|
||
#### 2.2.5 models.py
|
||
|
||
定义所有数据结构:
|
||
- 输入/输出数据模型
|
||
- 配置数据模型
|
||
- 内部传递数据模型
|
||
|
||
#### 2.2.6 interfaces.py
|
||
|
||
定义抽象接口,实现依赖倒置:
|
||
- 文件处理接口
|
||
- LLM客户端接口
|
||
- Prompt构建接口
|
||
|
||
#### 2.2.7 config.py
|
||
|
||
配置管理:
|
||
- 读取配置文件
|
||
- 配置验证
|
||
- 环境变量支持
|
||
|
||
#### 2.2.8 main.py
|
||
|
||
主程序编排逻辑,调用各个模块完成任务
|
||
|
||
### 2.3 依赖关系图
|
||
|
||
```mermaid
|
||
graph TD
|
||
main[main.py] --> config[config.py]
|
||
main --> interfaces[interfaces.py]
|
||
main --> models[models.py]
|
||
|
||
interfaces --> file_handler[core/file_handler.py]
|
||
interfaces --> client_factory[core/client_factory.py]
|
||
interfaces --> prompt_builder[core/prompt_builder.py]
|
||
|
||
client_factory --> batch_client[core/batch_client.py]
|
||
client_factory --> direct_client[core/direct_client.py]
|
||
|
||
file_handler --> models
|
||
batch_client --> models
|
||
direct_client --> models
|
||
prompt_builder --> models
|
||
|
||
style main fill:#e1f5ff
|
||
style interfaces fill:#fff4e1
|
||
style models fill:#f0f0f0
|
||
style config fill:#f0f0f0
|
||
```
|
||
|
||
### 2.4 整体业务流程
|
||
|
||
```mermaid
|
||
graph LR
|
||
A[读取产品列表Excel] --> B[读取产品类别配置]
|
||
B --> C[构建系统Prompt]
|
||
C --> D[生成请求列表]
|
||
D --> E{调用模式}
|
||
E -->|Batch模式| F[上传→创建→等待→下载]
|
||
E -->|Direct模式| G[循环调用API]
|
||
F --> H[解析结果]
|
||
G --> H
|
||
H --> I[写入JSONL文件]
|
||
I --> J{是否需要CSV?}
|
||
J -->|是| K[合并原始数据]
|
||
J -->|否| L[结束]
|
||
K --> M[写入CSV文件]
|
||
M --> L
|
||
```
|
||
|
||
## 3 接口设计(interfaces.py)
|
||
|
||
### 3.1 接口类图
|
||
|
||
```mermaid
|
||
classDiagram
|
||
%% 文件处理相关接口
|
||
class IFileHandler {
|
||
<<interface>>
|
||
+read_products(file_path) List~ProductInput~
|
||
+read_products_with_full_data(file_path) tuple
|
||
+read_categories(file_path) List~ProductCategory~
|
||
+write_batch_requests(requests, output_path) str
|
||
+parse_batch_responses(file_content) List~ClassificationResult~
|
||
+write_results(results, output_path) None
|
||
+merge_and_write_csv(products_data, results, output_path) None
|
||
}
|
||
|
||
class IProductReader {
|
||
<<interface>>
|
||
+read(file_path) List~ProductInput~
|
||
+read_with_full_data(file_path) tuple
|
||
}
|
||
|
||
class ICategoryReader {
|
||
<<interface>>
|
||
+read(file_path) List~ProductCategory~
|
||
}
|
||
|
||
class IBatchFileWriter {
|
||
<<interface>>
|
||
+write(requests, output_path) str
|
||
}
|
||
|
||
class IResultParser {
|
||
<<interface>>
|
||
+parse(file_content) List~ClassificationResult~
|
||
}
|
||
|
||
class IResultWriter {
|
||
<<interface>>
|
||
+write(results, output_path) None
|
||
}
|
||
|
||
class IResultMerger {
|
||
<<interface>>
|
||
+merge(products_data, results) List~ProductWithClassification~
|
||
}
|
||
|
||
class ICSVWriter {
|
||
<<interface>>
|
||
+write(data, output_path) None
|
||
}
|
||
|
||
IFileHandler o-- IProductReader
|
||
IFileHandler o-- ICategoryReader
|
||
IFileHandler o-- IBatchFileWriter
|
||
IFileHandler o-- IResultParser
|
||
IFileHandler o-- IResultWriter
|
||
IFileHandler o-- IResultMerger
|
||
IFileHandler o-- ICSVWriter
|
||
|
||
%% LLM客户端相关接口
|
||
class ILLMClient {
|
||
<<interface>>
|
||
+classify_products(products, system_prompt, requests) List~ClassificationResult~
|
||
}
|
||
|
||
class IBatchClient {
|
||
<<interface>>
|
||
+upload_file(file_path) str
|
||
+create_batch(input_file_id, endpoint, completion_window) str
|
||
+get_batch_status(batch_id) BatchTaskInfo
|
||
+download_result(file_id) str
|
||
+wait_for_completion(batch_id, poll_interval) BatchTaskInfo
|
||
}
|
||
|
||
%% Prompt构建接口
|
||
class IPromptBuilder {
|
||
<<interface>>
|
||
+build_system_prompt(categories) str
|
||
+build_user_prompt(product) str
|
||
+build_classification_request(product, system_prompt) dict
|
||
}
|
||
```
|
||
|
||
### 3.2 IFileHandler接口
|
||
|
||
文件处理统一接口,通过组合方式整合各个专门的文件处理类。
|
||
|
||
```python
|
||
class IFileHandler(ABC):
|
||
"""文件处理统一接口"""
|
||
|
||
def __init__(
|
||
self,
|
||
product_reader: IProductReader,
|
||
category_reader: ICategoryReader,
|
||
batch_file_writer: IBatchFileWriter,
|
||
result_parser: IResultParser,
|
||
result_writer: IResultWriter,
|
||
result_merger: IResultMerger,
|
||
csv_writer: ICSVWriter
|
||
):
|
||
"""
|
||
初始化文件处理器
|
||
|
||
Args:
|
||
product_reader: 产品读取器
|
||
category_reader: 类别读取器
|
||
batch_file_writer: Batch文件写入器
|
||
result_parser: 结果解析器
|
||
result_writer: 结果写入器
|
||
result_merger: 结果合并器
|
||
csv_writer: CSV写入器
|
||
"""
|
||
pass
|
||
|
||
def read_products(self, file_path: str) -> List[ProductInput]:
|
||
"""
|
||
读取产品列表
|
||
|
||
Args:
|
||
file_path: Excel文件路径
|
||
|
||
Returns:
|
||
List[ProductInput]: 产品列表
|
||
"""
|
||
pass
|
||
|
||
def read_products_with_full_data(
|
||
self,
|
||
file_path: str
|
||
) -> tuple[List[ProductInput], List[Dict[str, Any]]]:
|
||
"""
|
||
读取产品列表和完整原始数据
|
||
|
||
Args:
|
||
file_path: Excel文件路径
|
||
|
||
Returns:
|
||
tuple: (产品列表, 完整原始数据)
|
||
"""
|
||
pass
|
||
|
||
def read_categories(self, file_path: str) -> List[ProductCategory]:
|
||
"""
|
||
读取类别配置
|
||
|
||
Args:
|
||
file_path: JSONL文件路径
|
||
|
||
Returns:
|
||
List[ProductCategory]: 类别列表
|
||
"""
|
||
pass
|
||
|
||
def write_batch_requests(
|
||
self,
|
||
requests: List[dict],
|
||
output_path: str
|
||
) -> str:
|
||
"""
|
||
写入Batch请求
|
||
|
||
Args:
|
||
requests: Batch请求列表
|
||
output_path: 输出文件路径
|
||
|
||
Returns:
|
||
str: 生成的文件路径
|
||
"""
|
||
pass
|
||
|
||
def parse_batch_responses(self, file_content: str) -> List[ClassificationResult]:
|
||
"""
|
||
解析Batch响应
|
||
|
||
Args:
|
||
file_content: JSONL文件内容字符串
|
||
|
||
Returns:
|
||
List[ClassificationResult]: 分类结果列表
|
||
"""
|
||
pass
|
||
|
||
def write_results(
|
||
self,
|
||
results: List[ClassificationResult],
|
||
output_path: str
|
||
) -> None:
|
||
"""
|
||
写入分类结果
|
||
|
||
Args:
|
||
results: 分类结果列表
|
||
output_path: 输出文件路径
|
||
"""
|
||
pass
|
||
|
||
def merge_and_write_csv(
|
||
self,
|
||
products_with_data: List[Dict[str, Any]],
|
||
classification_results: List[ClassificationResult],
|
||
output_path: str
|
||
) -> None:
|
||
"""
|
||
合并数据并写入CSV文件
|
||
|
||
Args:
|
||
products_with_data: 完整原始数据
|
||
classification_results: 分类结果
|
||
output_path: CSV输出路径
|
||
"""
|
||
pass
|
||
```
|
||
|
||
### 3.3 专门文件处理接口
|
||
|
||
#### 3.3.1 IProductReader接口
|
||
|
||
```python
|
||
class IProductReader(ABC):
|
||
"""产品列表读取接口"""
|
||
|
||
@abstractmethod
|
||
def read(self, file_path: str) -> List[ProductInput]:
|
||
"""
|
||
读取产品列表Excel文件
|
||
|
||
Args:
|
||
file_path: Excel文件路径
|
||
|
||
Returns:
|
||
List[ProductInput]: 产品列表
|
||
|
||
Raises:
|
||
FileNotFoundError: 文件不存在
|
||
ValueError: 文件格式错误或缺少必要列
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def read_with_full_data(self, file_path: str) -> tuple[List[ProductInput], List[Dict[str, Any]]]:
|
||
"""
|
||
读取产品列表和完整原始数据
|
||
|
||
Args:
|
||
file_path: Excel文件路径
|
||
|
||
Returns:
|
||
tuple: (产品列表, 完整原始数据列表)
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### 3.3.2 ICategoryReader接口
|
||
|
||
```python
|
||
class ICategoryReader(ABC):
|
||
"""类别配置读取接口"""
|
||
|
||
@abstractmethod
|
||
def read(self, file_path: str) -> List[ProductCategory]:
|
||
"""
|
||
读取产品类别配置JSONL文件
|
||
|
||
Args:
|
||
file_path: JSONL文件路径
|
||
|
||
Returns:
|
||
List[ProductCategory]: 类别列表
|
||
|
||
Raises:
|
||
FileNotFoundError: 文件不存在
|
||
ValueError: JSON格式错误
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### 3.3.3 IBatchFileWriter接口
|
||
|
||
```python
|
||
class IBatchFileWriter(ABC):
|
||
"""Batch请求文件写入接口"""
|
||
|
||
@abstractmethod
|
||
def write(
|
||
self,
|
||
requests: List[dict],
|
||
output_path: str
|
||
) -> str:
|
||
"""
|
||
将Batch请求写入JSONL文件
|
||
|
||
Args:
|
||
requests: Batch请求列表
|
||
output_path: 输出文件路径
|
||
|
||
Returns:
|
||
str: 生成的文件路径
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### 3.3.4 IResultParser接口
|
||
|
||
```python
|
||
class IResultParser(ABC):
|
||
"""结果解析接口"""
|
||
|
||
@abstractmethod
|
||
def parse(self, file_content: str) -> List[ClassificationResult]:
|
||
"""
|
||
解析Batch响应JSONL内容
|
||
|
||
Args:
|
||
file_content: JSONL文件内容字符串
|
||
|
||
Returns:
|
||
List[ClassificationResult]: 分类结果列表
|
||
|
||
Raises:
|
||
ValueError: 解析错误
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### 3.3.5 IResultWriter接口
|
||
|
||
```python
|
||
class IResultWriter(ABC):
|
||
"""结果写入接口"""
|
||
|
||
@abstractmethod
|
||
def write(
|
||
self,
|
||
results: List[ClassificationResult],
|
||
output_path: str
|
||
) -> None:
|
||
"""
|
||
写入分类结果到JSONL文件
|
||
|
||
Args:
|
||
results: 分类结果列表
|
||
output_path: 输出文件路径
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### 3.3.6 IResultMerger接口
|
||
|
||
```python
|
||
class IResultMerger(ABC):
|
||
"""结果合并接口"""
|
||
|
||
@abstractmethod
|
||
def merge(
|
||
self,
|
||
products_with_data: List[Dict[str, Any]],
|
||
classification_results: List[ClassificationResult]
|
||
) -> List[ProductWithClassification]:
|
||
"""
|
||
合并原始产品数据和分类结果
|
||
|
||
Args:
|
||
products_with_data: 原始产品数据列表(包含所有Excel字段)
|
||
classification_results: 分类结果列表
|
||
|
||
Returns:
|
||
List[ProductWithClassification]: 合并后的完整数据列表
|
||
|
||
Raises:
|
||
ValueError: 数据不匹配或缺少必要字段
|
||
"""
|
||
pass
|
||
```
|
||
|
||
#### 3.3.7 ICSVWriter接口
|
||
|
||
```python
|
||
class ICSVWriter(ABC):
|
||
"""CSV写入接口"""
|
||
|
||
@abstractmethod
|
||
def write(
|
||
self,
|
||
data: List[ProductWithClassification],
|
||
output_path: str
|
||
) -> None:
|
||
"""
|
||
写入完整数据到CSV文件
|
||
|
||
Args:
|
||
data: 完整产品数据列表
|
||
output_path: 输出文件路径
|
||
"""
|
||
pass
|
||
```
|
||
|
||
### 3.4 ILLMClient接口
|
||
|
||
LLM客户端统一接口,采用策略模式封装两种调用方式。
|
||
|
||
```python
|
||
class ILLMClient(ABC):
|
||
"""LLM客户端统一接口(策略模式)"""
|
||
|
||
@abstractmethod
|
||
def classify_products(
|
||
self,
|
||
products: List[ProductInput],
|
||
system_prompt: str,
|
||
requests: List[dict]
|
||
) -> List[ClassificationResult]:
|
||
"""
|
||
对产品列表进行分类(统一接口)
|
||
|
||
Args:
|
||
products: 产品列表
|
||
system_prompt: 系统提示词
|
||
requests: 预构建的请求列表(Batch格式)
|
||
|
||
Returns:
|
||
List[ClassificationResult]: 分类结果列表
|
||
|
||
Raises:
|
||
APIError: API调用失败
|
||
TimeoutError: 超时(仅Batch模式)
|
||
"""
|
||
pass
|
||
```
|
||
|
||
### 3.5 IPromptBuilder接口
|
||
|
||
```python
|
||
class IPromptBuilder(ABC):
|
||
"""Prompt构建接口"""
|
||
|
||
@abstractmethod
|
||
def build_system_prompt(self, categories: List[ProductCategory]) -> str:
|
||
"""
|
||
构建系统提示词
|
||
|
||
Args:
|
||
categories: 产品类别列表
|
||
|
||
Returns:
|
||
str: 系统提示词
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def build_user_prompt(self, product: ProductInput) -> str:
|
||
"""
|
||
构建用户提示词
|
||
|
||
Args:
|
||
product: 产品信息
|
||
|
||
Returns:
|
||
str: 用户提示词
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def build_classification_request(
|
||
self,
|
||
product: ProductInput,
|
||
system_prompt: str,
|
||
model: str = "qwen-flash"
|
||
) -> dict:
|
||
"""
|
||
构建完整的分类请求
|
||
|
||
Args:
|
||
product: 产品信息
|
||
system_prompt: 系统提示词
|
||
model: 模型名称
|
||
|
||
Returns:
|
||
dict: Batch请求格式的字典
|
||
"""
|
||
pass
|
||
```
|
||
|
||
## 4 数据模型(models.py)
|
||
|
||
### 4.1 输入数据模型
|
||
|
||
```python
|
||
from pydantic import BaseModel, Field
|
||
from typing import Optional, Dict, Any, List
|
||
|
||
class ProductInput(BaseModel):
|
||
"""产品输入数据"""
|
||
product_id: str = Field(..., description="产品编号")
|
||
product_name: str = Field(..., description="产品名称")
|
||
scenic_spot: str = Field(..., description="景区名称")
|
||
|
||
class ProductCategory(BaseModel):
|
||
"""产品类别定义"""
|
||
category: str = Field(..., description="类别,如:门票、住宿、餐饮")
|
||
type: str = Field(..., description="类型")
|
||
sub_type: str = Field(default="", description="子类型")
|
||
```
|
||
|
||
### 4.2 输出数据模型
|
||
|
||
```python
|
||
class ClassificationResult(BaseModel):
|
||
"""分类结果"""
|
||
product_id: str = Field(..., description="产品编号")
|
||
category: str = Field(..., description="类别")
|
||
type: str = Field(..., description="类型")
|
||
sub_type: str = Field(default="", description="子类型")
|
||
|
||
class ProductWithClassification(BaseModel):
|
||
"""完整的产品数据(原始数据+分类结果)"""
|
||
original_data: Dict[str, Any] = Field(..., description="原始Excel中的所有字段数据")
|
||
category: str = Field(..., description="类别")
|
||
type: str = Field(..., description="类型")
|
||
sub_type: str = Field(default="", description="子类型")
|
||
```
|
||
|
||
### 4.3 内部数据模型
|
||
|
||
```python
|
||
class BatchRequest(BaseModel):
|
||
"""Batch API请求格式"""
|
||
custom_id: str = Field(..., description="自定义请求ID,对应产品编号")
|
||
method: str = Field(default="POST", description="请求方法")
|
||
url: str = Field(default="/v1/chat/completions", description="API路径")
|
||
body: dict = Field(..., description="请求体")
|
||
|
||
class BatchResponse(BaseModel):
|
||
"""Batch API响应格式"""
|
||
id: str = Field(..., description="请求ID")
|
||
custom_id: str = Field(..., description="自定义请求ID")
|
||
response: dict = Field(..., description="响应内容")
|
||
error: Optional[dict] = Field(default=None, description="错误信息")
|
||
|
||
class BatchTaskInfo(BaseModel):
|
||
"""Batch任务信息"""
|
||
task_id: str = Field(..., description="Batch任务ID")
|
||
status: str = Field(..., description="任务状态")
|
||
input_file_id: str = Field(..., description="输入文件ID")
|
||
output_file_id: Optional[str] = Field(default=None, description="输出文件ID")
|
||
error_file_id: Optional[str] = Field(default=None, description="错误文件ID")
|
||
```
|
||
|
||
### 4.4 配置数据模型
|
||
|
||
```python
|
||
from enum import Enum
|
||
|
||
class LLMClientMode(str, Enum):
|
||
"""LLM客户端模式"""
|
||
BATCH = "batch"
|
||
DIRECT = "direct"
|
||
|
||
class ApiConfig(BaseModel):
|
||
"""API配置"""
|
||
mode: LLMClientMode = Field(
|
||
default=LLMClientMode.BATCH,
|
||
description="调用模式:batch(批量)或direct(直接)"
|
||
)
|
||
api_key: Optional[str] = Field(default=None, description="API密钥")
|
||
base_url: str = Field(..., description="API基础URL")
|
||
model: str = Field(..., description="模型名称")
|
||
completion_window: str = Field(default="24h", description="Batch模式等待时间")
|
||
max_retries: int = Field(default=3, description="Direct模式最大重试次数")
|
||
retry_delay: int = Field(default=1, description="Direct模式重试延迟(秒)")
|
||
|
||
class FilesConfig(BaseModel):
|
||
"""文件配置"""
|
||
input_file: str = Field(..., description="产品列表Excel文件路径")
|
||
category_file: str = Field(..., description="产品类别配置JSONL文件路径")
|
||
output_file: str = Field(..., description="分类结果JSONL输出文件路径")
|
||
csv_output_file: Optional[str] = Field(
|
||
default=None,
|
||
description="分类结果CSV输出文件路径(可选)"
|
||
)
|
||
temp_request_file: str = Field(
|
||
default="temp_batch_requests.jsonl",
|
||
description="临时请求文件路径"
|
||
)
|
||
|
||
class LoggingConfig(BaseModel):
|
||
"""日志配置"""
|
||
level: str = Field(default="INFO", description="日志级别")
|
||
format: str = Field(
|
||
default="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
description="日志格式"
|
||
)
|
||
file: str = Field(default="logs/classifier.log", description="日志文件路径")
|
||
|
||
class BatchConfig(BaseModel):
|
||
"""Batch配置"""
|
||
poll_interval: int = Field(default=60, description="轮询间隔(秒)")
|
||
max_wait_time: int = Field(default=86400, description="最大等待时间(秒)")
|
||
|
||
class AppConfig(BaseModel):
|
||
"""应用配置"""
|
||
api: ApiConfig
|
||
files: FilesConfig
|
||
logging: LoggingConfig
|
||
batch: BatchConfig
|
||
```
|
||
|
||
## 5 核心实现(core/)
|
||
|
||
### 5.1 file_handler.py
|
||
|
||
#### 5.1.1 ProductReader类
|
||
|
||
**功能**:读取Excel格式的产品列表
|
||
|
||
**算法流程**:
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> ReadExcel[使用pandas读取Excel]
|
||
ReadExcel --> CheckColumns{检查必要列}
|
||
CheckColumns -->|缺少| Error[抛出ValueError]
|
||
CheckColumns -->|完整| ParseRows[遍历每一行]
|
||
ParseRows --> CreateProduct[创建ProductInput对象]
|
||
CreateProduct --> SaveFullData{是否需要完整数据?}
|
||
SaveFullData -->|是| ToDict[转换为字典保存]
|
||
SaveFullData -->|否| Skip[跳过]
|
||
ToDict --> AddToList[添加到列表]
|
||
Skip --> AddToList
|
||
AddToList --> HasMore{还有行?}
|
||
HasMore -->|是| ParseRows
|
||
HasMore -->|否| Return([返回结果])
|
||
Error --> End([结束])
|
||
```
|
||
|
||
**方法**:
|
||
|
||
```python
|
||
def read(self, file_path: str) -> List[ProductInput]:
|
||
"""读取产品列表"""
|
||
pass
|
||
|
||
def read_with_full_data(self, file_path: str) -> tuple[List[ProductInput], List[Dict[str, Any]]]:
|
||
"""读取产品列表和完整原始数据"""
|
||
pass
|
||
```
|
||
|
||
#### 5.1.2 ResultMerger类
|
||
|
||
**功能**:合并原始产品数据和分类结果
|
||
|
||
**算法流程**:
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> BuildDict[构建分类结果字典<br/>key: product_id]
|
||
BuildDict --> LoopProducts[遍历原始产品数据]
|
||
LoopProducts --> GetID[获取产品编号]
|
||
GetID --> LookupResult{查找分类结果}
|
||
LookupResult -->|找到| Merge1[合并原始数据+分类]
|
||
LookupResult -->|未找到| Merge2[原始数据+默认分类]
|
||
Merge1 --> AddResult[添加到结果列表]
|
||
Merge2 --> AddResult
|
||
AddResult --> HasMore{还有产品?}
|
||
HasMore -->|是| LoopProducts
|
||
HasMore -->|否| Return([返回合并结果])
|
||
```
|
||
|
||
#### 5.1.3 CSVWriter类
|
||
|
||
**功能**:将完整数据写入CSV文件
|
||
|
||
**算法流程**:
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> GetColumns[获取所有列名<br/>原始列+分类列]
|
||
GetColumns --> BuildRows[构建数据行]
|
||
BuildRows --> LoopData[遍历每条数据]
|
||
LoopData --> MergeRow[合并原始字段+分类字段]
|
||
MergeRow --> AddRow[添加到行列表]
|
||
AddRow --> HasMore{还有数据?}
|
||
HasMore -->|是| LoopData
|
||
HasMore -->|否| CreateDF[创建DataFrame]
|
||
CreateDF --> WriteCSV[写入CSV文件<br/>utf-8-sig编码]
|
||
WriteCSV --> End([结束])
|
||
```
|
||
|
||
### 5.2 batch_client.py与direct_client.py
|
||
|
||
#### 5.2.1 架构类图
|
||
|
||
```mermaid
|
||
classDiagram
|
||
class ILLMClient {
|
||
<<interface>>
|
||
+classify_products(products, system_prompt, requests) List~ClassificationResult~
|
||
}
|
||
|
||
class BatchClient {
|
||
-_client: OpenAI
|
||
-_logger: Logger
|
||
+classify_products(...) List~ClassificationResult~
|
||
-_upload_file(file_path) str
|
||
-_create_batch(...) str
|
||
-_wait_for_completion(...) BatchTaskInfo
|
||
-_download_result(file_id) str
|
||
}
|
||
|
||
class DirectClient {
|
||
-_client: OpenAI
|
||
-_model: str
|
||
-_logger: Logger
|
||
-_max_retries: int
|
||
+classify_products(...) List~ClassificationResult~
|
||
-_classify_single_product(...) ClassificationResult
|
||
-_parse_response(...) ClassificationResult
|
||
}
|
||
|
||
ILLMClient <|-- BatchClient : 实现
|
||
ILLMClient <|-- DirectClient : 实现
|
||
```
|
||
|
||
#### 5.2.2 BatchClient算法流程
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> WriteFile[写入请求文件]
|
||
WriteFile --> Upload[上传文件]
|
||
Upload --> CreateBatch[创建Batch任务]
|
||
CreateBatch --> Poll[轮询任务状态]
|
||
Poll --> CheckStatus{检查状态}
|
||
CheckStatus -->|进行中| Wait[等待poll_interval]
|
||
Wait --> Poll
|
||
CheckStatus -->|completed| Download[下载结果文件]
|
||
CheckStatus -->|failed| Error1[抛出错误]
|
||
CheckStatus -->|expired| Error2[抛出超时错误]
|
||
Download --> Parse[解析结果]
|
||
Parse --> Cleanup[清理临时文件]
|
||
Cleanup --> Return([返回分类结果])
|
||
Error1 --> End([结束])
|
||
Error2 --> End
|
||
```
|
||
|
||
#### 5.2.3 DirectClient算法流程
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> InitResults[初始化结果列表]
|
||
InitResults --> LoopProducts[遍历产品列表]
|
||
LoopProducts --> CallAPI[调用chat.completions.create]
|
||
CallAPI --> CheckResponse{调用成功?}
|
||
CheckResponse -->|成功| ParseResponse[解析响应]
|
||
CheckResponse -->|失败| CheckRetry{是否还能重试?}
|
||
CheckRetry -->|是| Wait[等待retry_delay]
|
||
Wait --> CallAPI
|
||
CheckRetry -->|否| LogError[记录错误]
|
||
LogError --> Skip[跳过该产品]
|
||
ParseResponse --> AddResult[添加到结果列表]
|
||
Skip --> HasMore{还有产品?}
|
||
AddResult --> HasMore
|
||
HasMore -->|是| LoopProducts
|
||
HasMore -->|否| Return([返回分类结果])
|
||
```
|
||
|
||
### 5.3 client_factory.py
|
||
|
||
#### 5.3.1 工厂模式类图
|
||
|
||
```mermaid
|
||
classDiagram
|
||
class LLMClientFactory {
|
||
<<factory>>
|
||
+create_client(mode, ...) ILLMClient
|
||
}
|
||
|
||
class LLMClientMode {
|
||
<<enumeration>>
|
||
BATCH
|
||
DIRECT
|
||
}
|
||
|
||
class ILLMClient {
|
||
<<interface>>
|
||
+classify_products(...) List~ClassificationResult~
|
||
}
|
||
|
||
class BatchClient {
|
||
+classify_products(...) List~ClassificationResult~
|
||
}
|
||
|
||
class DirectClient {
|
||
+classify_products(...) List~ClassificationResult~
|
||
}
|
||
|
||
LLMClientFactory ..> LLMClientMode : 使用
|
||
LLMClientFactory ..> ILLMClient : 创建
|
||
LLMClientFactory ..> BatchClient : 创建
|
||
LLMClientFactory ..> DirectClient : 创建
|
||
ILLMClient <|-- BatchClient : 实现
|
||
ILLMClient <|-- DirectClient : 实现
|
||
```
|
||
|
||
#### 5.3.2 工厂方法
|
||
|
||
```python
|
||
@staticmethod
|
||
def create_client(
|
||
mode: LLMClientMode,
|
||
api_key: str,
|
||
base_url: str,
|
||
model: str,
|
||
logger: logging.Logger,
|
||
completion_window: str = "24h",
|
||
max_retries: int = 3
|
||
) -> ILLMClient:
|
||
"""
|
||
根据模式创建对应的LLM客户端
|
||
|
||
Args:
|
||
mode: 客户端模式(batch或direct)
|
||
api_key: API密钥
|
||
base_url: API基础URL
|
||
model: 模型名称
|
||
logger: 日志记录器
|
||
completion_window: Batch模式的等待时间(仅Batch模式使用)
|
||
max_retries: 最大重试次数(仅Direct模式使用)
|
||
|
||
Returns:
|
||
ILLMClient: 对应模式的客户端实例
|
||
|
||
Raises:
|
||
ValueError: 不支持的模式
|
||
"""
|
||
pass
|
||
```
|
||
|
||
### 5.4 prompt_builder.py
|
||
|
||
#### 5.4.1 Prompt构建流程
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> BuildSystem[构建系统Prompt]
|
||
BuildSystem --> FormatCategories[格式化类别列表]
|
||
FormatCategories --> AddInstructions[添加分类指令]
|
||
AddInstructions --> AddOutputFormat[添加输出格式要求]
|
||
AddOutputFormat --> SystemReady([系统Prompt完成])
|
||
|
||
Start2([为每个产品]) --> BuildUser[构建用户Prompt]
|
||
BuildUser --> ExtractInfo[提取产品信息]
|
||
ExtractInfo --> FormatProduct[格式化产品描述]
|
||
FormatProduct --> UserReady([用户Prompt完成])
|
||
|
||
SystemReady --> Combine[组合系统+用户Prompt]
|
||
UserReady --> Combine
|
||
Combine --> BuildRequest[构建Batch请求格式]
|
||
BuildRequest --> End([完成])
|
||
```
|
||
|
||
#### 5.4.2 Prompt模板
|
||
|
||
**系统提示词模板**:
|
||
|
||
```python
|
||
def build_system_prompt(self, categories: List[ProductCategory]) -> str:
|
||
"""
|
||
你是一个产品分类专家。请根据以下产品类别体系对产品进行分类:
|
||
|
||
{格式化的类别列表}
|
||
|
||
请严格按照以下JSON格式输出分类结果:
|
||
{
|
||
"category": "类别",
|
||
"type": "类型",
|
||
"sub_type": "子类型"
|
||
}
|
||
|
||
分类原则:
|
||
1. 必须从提供的类别体系中选择最合适的分类
|
||
2. 如果没有合适的子类型,sub_type可以为空字符串
|
||
3. 只输出JSON格式,不要包含其他说明文字
|
||
"""
|
||
pass
|
||
```
|
||
|
||
**用户提示词模板**:
|
||
|
||
```python
|
||
def build_user_prompt(self, product: ProductInput) -> str:
|
||
"""
|
||
请对以下产品进行分类:
|
||
|
||
产品编号: {product.product_id}
|
||
产品名称: {product.product_name}
|
||
景区名称: {product.scenic_spot}
|
||
|
||
请输出分类结果。
|
||
"""
|
||
pass
|
||
```
|
||
|
||
## 6 主程序逻辑(main.py)
|
||
|
||
### 6.1 主流程图
|
||
|
||
```mermaid
|
||
graph TD
|
||
Start([开始]) --> Init[初始化配置和依赖]
|
||
Init --> CreateFactory[使用工厂创建LLM客户端]
|
||
CreateFactory --> CheckCSV{是否需要CSV输出?}
|
||
CheckCSV -->|是| ReadFull[读取产品列表<br/>含完整数据]
|
||
CheckCSV -->|否| ReadSimple[读取产品列表]
|
||
ReadFull --> ReadCategories
|
||
ReadSimple --> ReadCategories[读取类别配置]
|
||
ReadCategories --> BuildPrompt[构建系统Prompt]
|
||
BuildPrompt --> GenRequests[生成请求列表]
|
||
GenRequests --> Classify[调用客户端classify_products]
|
||
Classify --> WriteJSONL[写入JSONL结果]
|
||
WriteJSONL --> CheckCSV2{是否需要CSV?}
|
||
CheckCSV2 -->|是| MergeData[合并原始数据和分类结果]
|
||
CheckCSV2 -->|否| LogStats
|
||
MergeData --> WriteCSV[写入CSV文件]
|
||
WriteCSV --> LogStats[记录统计信息]
|
||
LogStats --> End([结束])
|
||
```
|
||
|
||
### 6.2 main函数伪代码
|
||
|
||
```python
|
||
def main(config: AppConfig) -> None:
|
||
"""主函数"""
|
||
|
||
# 1. 初始化日志
|
||
logger = setup_logger(config.logging)
|
||
|
||
# 2. 获取API Key
|
||
api_key = config.api.api_key or get_env_api_key()
|
||
|
||
# 3. 初始化文件处理器
|
||
file_handler = FileHandler(
|
||
product_reader=ProductReader(),
|
||
category_reader=CategoryReader(),
|
||
batch_file_writer=BatchFileWriter(),
|
||
result_parser=ResultParser(),
|
||
result_writer=ResultWriter(),
|
||
result_merger=ResultMerger(),
|
||
csv_writer=CSVWriter()
|
||
)
|
||
|
||
# 4. 使用工厂创建LLM客户端
|
||
llm_client = LLMClientFactory.create_client(
|
||
mode=config.api.mode,
|
||
api_key=api_key,
|
||
base_url=config.api.base_url,
|
||
model=config.api.model,
|
||
logger=logger,
|
||
completion_window=config.api.completion_window,
|
||
max_retries=config.api.max_retries
|
||
)
|
||
|
||
# 5. 初始化Prompt构建器
|
||
prompt_builder = PromptBuilder()
|
||
|
||
try:
|
||
# 6. 读取输入数据
|
||
logger.info(f"读取产品列表: {config.files.input_file}")
|
||
|
||
if config.files.csv_output_file:
|
||
products, products_with_data = file_handler.read_products_with_full_data(
|
||
config.files.input_file
|
||
)
|
||
else:
|
||
products = file_handler.read_products(config.files.input_file)
|
||
products_with_data = None
|
||
|
||
logger.info(f"共读取 {len(products)} 个产品")
|
||
|
||
logger.info(f"读取类别配置: {config.files.category_file}")
|
||
categories = file_handler.read_categories(config.files.category_file)
|
||
logger.info(f"共读取 {len(categories)} 个类别")
|
||
|
||
# 7. 构建Prompt和请求
|
||
logger.info("构建系统Prompt")
|
||
system_prompt = prompt_builder.build_system_prompt(categories)
|
||
|
||
logger.info("生成分类请求")
|
||
requests = []
|
||
for product in products:
|
||
request = prompt_builder.build_classification_request(
|
||
product, system_prompt, config.api.model
|
||
)
|
||
requests.append(request)
|
||
logger.info(f"生成 {len(requests)} 个请求")
|
||
|
||
# 8. 使用统一接口进行分类
|
||
logger.info(f"使用 {config.api.mode.value} 模式进行产品分类")
|
||
results = llm_client.classify_products(products, system_prompt, requests)
|
||
|
||
# 9. 写入JSONL输出文件
|
||
logger.info(f"写入分类结果: {config.files.output_file}")
|
||
file_handler.write_results(results, config.files.output_file)
|
||
|
||
# 10. 如果配置了CSV输出,则生成CSV文件
|
||
if config.files.csv_output_file and products_with_data:
|
||
logger.info(f"生成CSV文件: {config.files.csv_output_file}")
|
||
file_handler.merge_and_write_csv(
|
||
products_with_data,
|
||
results,
|
||
config.files.csv_output_file
|
||
)
|
||
logger.info("CSV文件生成完成")
|
||
|
||
# 11. 统计信息
|
||
success_count = len(results)
|
||
failed_count = len(products) - success_count
|
||
logger.info(f"分类完成: 成功 {success_count}, 失败 {failed_count}")
|
||
|
||
if failed_count > 0:
|
||
logger.warning(f"有 {failed_count} 个产品分类失败")
|
||
|
||
logger.info("="*60)
|
||
logger.info("产品分类程序成功完成")
|
||
logger.info("="*60)
|
||
|
||
except FileNotFoundError as e:
|
||
logger.error(f"文件不存在: {e}")
|
||
raise
|
||
except ValueError as e:
|
||
logger.error(f"数据格式错误: {e}")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"处理过程中发生错误: {e}")
|
||
raise
|
||
```
|
||
|
||
### 6.3 函数调用关系图
|
||
|
||
```mermaid
|
||
graph TD
|
||
main["main()
|
||
---
|
||
功能: 主程序入口
|
||
输入: AppConfig
|
||
输出: None"]
|
||
|
||
setup_logger["setup_logger()
|
||
---
|
||
功能: 初始化日志系统
|
||
输入: LoggingConfig
|
||
输出: Logger"]
|
||
|
||
create_client["LLMClientFactory.create_client()
|
||
---
|
||
功能: 创建LLM客户端
|
||
输入: mode, api_key等
|
||
输出: ILLMClient"]
|
||
|
||
read_products["file_handler.read_products_with_full_data()
|
||
---
|
||
功能: 读取产品列表和完整数据
|
||
输入: file_path
|
||
输出: tuple"]
|
||
|
||
read_categories["file_handler.read_categories()
|
||
---
|
||
功能: 读取类别配置
|
||
输入: file_path
|
||
输出: List~ProductCategory~"]
|
||
|
||
build_system_prompt["prompt_builder.build_system_prompt()
|
||
---
|
||
功能: 构建系统Prompt
|
||
输入: List~ProductCategory~
|
||
输出: str"]
|
||
|
||
build_request["prompt_builder.build_classification_request()
|
||
---
|
||
功能: 构建分类请求
|
||
输入: ProductInput, system_prompt
|
||
输出: dict"]
|
||
|
||
classify["llm_client.classify_products()
|
||
---
|
||
功能: 执行分类
|
||
输入: products, system_prompt, requests
|
||
输出: List~ClassificationResult~"]
|
||
|
||
write_results["file_handler.write_results()
|
||
---
|
||
功能: 写入JSONL结果
|
||
输入: results, output_path
|
||
输出: None"]
|
||
|
||
merge_csv["file_handler.merge_and_write_csv()
|
||
---
|
||
功能: 合并并写入CSV
|
||
输入: products_data, results, path
|
||
输出: None"]
|
||
|
||
main --> setup_logger
|
||
main --> create_client
|
||
main --> read_products
|
||
main --> read_categories
|
||
main --> build_system_prompt
|
||
main --> build_request
|
||
main --> classify
|
||
main --> write_results
|
||
main --> merge_csv
|
||
```
|
||
|
||
## 7 功能扩展
|
||
|
||
### 7.1 直接请求模式扩展
|
||
|
||
#### 7.1.1 扩展背景
|
||
|
||
Batch模式响应较慢(需要上传文件、创建任务、轮询等待、下载结果),在某些场景下需要更快速的直接请求方式。为兼容两种调用方式,对现有架构进行最小化修改。
|
||
|
||
#### 7.1.2 两种调用方式对比
|
||
|
||
| 特性 | Batch模式 | 直接请求模式 |
|
||
|------|----------|-------------|
|
||
| 调用方式 | 文件批量处理 | 实时单次请求 |
|
||
| 响应速度 | 慢(需等待调度) | 快(实时响应) |
|
||
| 费用 | 实时调用的50% | 标准费用 |
|
||
| 适用场景 | 大规模数据处理 | 小批量快速分类 |
|
||
| 流程步骤 | 上传→创建→等待→下载 | 直接调用API |
|
||
|
||
#### 7.1.3 架构修改方案
|
||
|
||
**核心修改点**:
|
||
|
||
1. **新增统一LLM客户端接口**(interfaces.py):使用策略模式封装两种调用方式
|
||
2. **重构BatchClient实现ILLMClient**(core/batch_client.py):将原有方法封装为统一接口
|
||
3. **新增DirectClient实现ILLMClient**(core/direct_client.py):实现直接请求模式
|
||
4. **新增客户端工厂**(core/client_factory.py):根据配置创建对应客户端
|
||
5. **修改main.py适配统一接口**:简化主流程,使用统一接口调用
|
||
|
||
**修改后的优点**:
|
||
- 接口统一:对上层main.py提供一致的调用接口
|
||
- 易于扩展:可轻松添加新的调用模式
|
||
- 配置驱动:无需改代码即可切换模式
|
||
- 保持兼容:默认行为与原有系统一致
|
||
|
||
### 7.2 CSV输出功能扩展
|
||
|
||
#### 7.2.1 扩展背景
|
||
|
||
当前系统输出的是JSONL格式的分类结果,仅包含产品编号和分类信息。实际业务需求是输出包含原始数据和分类结果的CSV文件,便于Excel打开和进一步分析。
|
||
|
||
#### 7.2.2 功能对比
|
||
|
||
| 特性 | 原有实现 | 扩展后实现 |
|
||
|------|---------|-----------|
|
||
| 输出格式 | JSONL | JSONL + CSV(可选) |
|
||
| 输出内容 | 仅分类结果 | 原始数据 + 分类结果 |
|
||
| 字段数量 | 4个(product_id + 3个分类) | Excel原始字段 + 3个分类 |
|
||
| 适用场景 | 程序间数据交换 | 人工查看、Excel分析 |
|
||
|
||
#### 7.2.3 架构扩展方案
|
||
|
||
**核心扩展点**:
|
||
|
||
1. **新增ProductWithClassification数据模型**(models.py):包含原始数据和分类结果
|
||
2. **新增IResultMerger接口**(interfaces.py):定义结果合并抽象
|
||
3. **实现ResultMerger**(core/file_handler.py):合并原始数据和分类结果
|
||
4. **修改ProductReader**(core/file_handler.py):支持读取完整数据
|
||
5. **新增ICSVWriter接口和实现**(core/file_handler.py):定义CSV写入功能
|
||
6. **扩展FileHandler统一接口**(core/file_handler.py):添加CSV相关方法
|
||
7. **修改main.py**(main.py):支持可选的CSV输出
|
||
|
||
**扩展后的优点**:
|
||
- 业务价值高:直接输出可用的Excel文件
|
||
- 数据完整:保留所有原始信息
|
||
- 向后兼容:不影响现有功能,可选启用
|
||
- 易于使用:CSV格式Excel可直接打开
|
||
|
||
#### 7.2.4 输出文件示例
|
||
|
||
**JSONL输出**(classification_result.jsonl):
|
||
```json
|
||
{"product_id":"9256359","category":"住宿","type":"商务酒店","sub_type":""}
|
||
{"product_id":"9262851","category":"住宿","type":"度假酒店","sub_type":""}
|
||
```
|
||
|
||
**CSV输出**(classification_result.csv):
|
||
```csv
|
||
产品编号,产品名称,景区名称,价格,库存,上架时间,类别,类型,子类型
|
||
9256359,某某商务酒店,某景区,299,50,2024-01-01,住宿,商务酒店,
|
||
9262851,某某度假酒店,某景区,599,30,2024-01-02,住宿,度假酒店,
|
||
```
|
||
|
||
## 8 测试设计
|
||
|
||
### 8.1 测试文件结构
|
||
|
||
```
|
||
tests/
|
||
├── __init__.py
|
||
├── conftest.py # pytest配置
|
||
├── fixtures/ # 测试数据
|
||
│ ├── create_test_data.py
|
||
│ ├── sample_products.xlsx
|
||
│ ├── sample_categories.jsonl
|
||
│ └── sample_batch_response.jsonl
|
||
├── test_models.py # 测试数据模型
|
||
├── test_config.py # 测试配置管理
|
||
├── test_file_handler.py # 测试文件处理
|
||
├── test_batch_client.py # 测试Batch客户端
|
||
├── test_direct_client.py # 测试Direct客户端
|
||
├── test_client_factory.py # 测试客户端工厂
|
||
├── test_prompt_builder.py # 测试Prompt构建
|
||
└── test_main.py # 测试主流程
|
||
```
|
||
|
||
### 8.2 核心测试用例
|
||
|
||
#### 8.2.1 文件处理测试
|
||
|
||
```python
|
||
class TestFileHandler:
|
||
"""测试FileHandler组合类"""
|
||
|
||
def test_read_products_success(self):
|
||
"""测试成功读取产品列表"""
|
||
pass
|
||
|
||
def test_read_products_with_full_data(self):
|
||
"""测试读取完整数据"""
|
||
pass
|
||
|
||
def test_read_categories_success(self):
|
||
"""测试成功读取类别配置"""
|
||
pass
|
||
|
||
def test_merge_and_write_csv(self):
|
||
"""测试合并数据并写入CSV"""
|
||
pass
|
||
|
||
def test_read_file_not_found(self):
|
||
"""测试读取不存在的文件"""
|
||
pass
|
||
|
||
def test_read_missing_columns(self):
|
||
"""测试缺少必要列的Excel"""
|
||
pass
|
||
```
|
||
|
||
#### 8.2.2 LLM客户端测试
|
||
|
||
```python
|
||
class TestBatchClient:
|
||
"""测试BatchClient类"""
|
||
|
||
def test_classify_products_success(self):
|
||
"""测试Batch模式分类成功"""
|
||
pass
|
||
|
||
def test_upload_file_success(self):
|
||
"""测试成功上传文件"""
|
||
pass
|
||
|
||
def test_wait_for_completion_timeout(self):
|
||
"""测试等待超时"""
|
||
pass
|
||
|
||
class TestDirectClient:
|
||
"""测试DirectClient类"""
|
||
|
||
def test_classify_products_success(self):
|
||
"""测试Direct模式分类成功"""
|
||
pass
|
||
|
||
def test_classify_with_retry(self):
|
||
"""测试带重试的分类"""
|
||
pass
|
||
|
||
def test_parse_response(self):
|
||
"""测试解析响应"""
|
||
pass
|
||
|
||
class TestClientFactory:
|
||
"""测试LLMClientFactory类"""
|
||
|
||
def test_create_batch_client(self):
|
||
"""测试创建Batch客户端"""
|
||
pass
|
||
|
||
def test_create_direct_client(self):
|
||
"""测试创建Direct客户端"""
|
||
pass
|
||
|
||
def test_invalid_mode(self):
|
||
"""测试无效的模式"""
|
||
pass
|
||
```
|
||
|
||
#### 8.2.3 主流程测试
|
||
|
||
```python
|
||
class TestMain:
|
||
"""测试主流程"""
|
||
|
||
def test_main_success_batch_mode(self):
|
||
"""测试Batch模式成功流程"""
|
||
pass
|
||
|
||
def test_main_success_direct_mode(self):
|
||
"""测试Direct模式成功流程"""
|
||
pass
|
||
|
||
def test_main_with_csv_output(self):
|
||
"""测试带CSV输出的流程"""
|
||
pass
|
||
|
||
def test_main_file_not_found(self):
|
||
"""测试输入文件不存在"""
|
||
pass
|
||
|
||
def test_main_api_error(self):
|
||
"""测试API调用失败"""
|
||
pass
|
||
```
|
||
|
||
### 8.3 集成测试
|
||
|
||
```python
|
||
class TestIntegration:
|
||
"""集成测试"""
|
||
|
||
@pytest.mark.integration
|
||
def test_end_to_end_batch_mode(self):
|
||
"""Batch模式端到端测试"""
|
||
pass
|
||
|
||
@pytest.mark.integration
|
||
def test_end_to_end_direct_mode(self):
|
||
"""Direct模式端到端测试"""
|
||
pass
|
||
|
||
@pytest.mark.integration
|
||
def test_end_to_end_with_csv(self):
|
||
"""带CSV输出的端到端测试"""
|
||
pass
|
||
```
|
||
|
||
## 9 配置管理
|
||
|
||
### 9.1 配置文件(config.yaml)
|
||
|
||
```yaml
|
||
# API配置
|
||
api:
|
||
mode: "batch" # 调用模式:batch(批量模式) 或 direct(直接模式)
|
||
api_key: null # API密钥,若为null则从环境变量DASHSCOPE_API_KEY读取
|
||
base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
||
model: "qwen-flash"
|
||
completion_window: "24h" # Batch模式的等待时间
|
||
max_retries: 3 # Direct模式的最大重试次数
|
||
retry_delay: 1 # Direct模式的重试延迟(秒)
|
||
|
||
# 文件路径配置
|
||
files:
|
||
input_file: "input/product_list.xlsx"
|
||
category_file: "config/product_type.jsonl"
|
||
output_file: "output/classification_result.jsonl"
|
||
csv_output_file: "output/classification_result.csv" # CSV输出(可选)
|
||
temp_request_file: "temp_batch_requests.jsonl"
|
||
|
||
# 日志配置
|
||
logging:
|
||
level: "INFO"
|
||
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||
file: "logs/classifier.log"
|
||
|
||
# Batch配置
|
||
batch:
|
||
poll_interval: 60 # 轮询间隔(秒)
|
||
max_wait_time: 86400 # 最大等待时间(秒,24小时)
|
||
```
|
||
|
||
### 9.2 环境变量
|
||
|
||
| 变量名 | 说明 | 是否必需 | 默认值 |
|
||
|--------|------|----------|--------|
|
||
| DASHSCOPE_API_KEY | 通义千问API Key | 是 | 无 |
|
||
|
||
### 9.3 命令行参数
|
||
|
||
```bash
|
||
python -m src.main \
|
||
--config config/config.yaml \
|
||
--mode direct \
|
||
--input-file input/product_list.xlsx \
|
||
--output-file output/result.jsonl \
|
||
--csv-output-file output/result.csv
|
||
```
|
||
|
||
## 10 错误处理
|
||
|
||
### 10.1 错误分类
|
||
|
||
| 错误类型 | 处理策略 | 日志级别 |
|
||
|---------|---------|---------|
|
||
| 文件不存在 | 立即终止,提示用户 | ERROR |
|
||
| 文件格式错误 | 立即终止,提示具体错误 | ERROR |
|
||
| API认证失败 | 立即终止,检查API Key | ERROR |
|
||
| 网络错误 | 重试3次,失败后终止 | WARNING → ERROR |
|
||
| Batch任务失败 | 记录错误,保存错误文件 | ERROR |
|
||
| 部分产品分类失败 | 继续处理,记录失败项 | WARNING |
|
||
|
||
### 10.2 重试机制
|
||
|
||
- **API调用失败**:最多重试3次,指数退避(1s, 2s, 4s)
|
||
- **文件下载失败**:最多重试3次
|
||
- **任务状态查询**:持续轮询直到完成或超时
|
||
|
||
### 10.3 日志记录
|
||
|
||
所有错误信息应记录到日志文件(logs/目录):
|
||
|
||
- **INFO级别**:正常流程信息
|
||
- **WARNING级别**:警告信息(如部分产品失败)
|
||
- **ERROR级别**:错误信息(如文件不存在、API失败)
|
||
|
||
## 11 依赖项
|
||
|
||
### 11.1 Python版本
|
||
|
||
- Python >= 3.9
|
||
|
||
### 11.2 第三方库
|
||
|
||
- openai:通义千问API调用
|
||
- pandas:Excel文件读取和CSV写入
|
||
- pydantic:数据模型验证
|
||
- openpyxl:Excel文件解析
|
||
- pyyaml:配置文件解析
|
||
- pytest:单元测试框架
|
||
|
||
### 11.3 依赖管理
|
||
|
||
使用Poetry进行依赖管理,配置文件为`pyproject.toml`。
|
||
|
||
## 12 输入输出格式说明
|
||
|
||
### 12.1 输入文件格式
|
||
|
||
**产品列表Excel(input/product_list.xlsx)**:
|
||
- 必须包含列:`产品编号`、`产品名称`、`景区名称`
|
||
- 编码:UTF-8
|
||
- 格式:.xlsx
|
||
|
||
**产品类别配置JSONL(config/product_type.jsonl)**:
|
||
- 每行一个JSON对象
|
||
- 必须包含字段:`category`、`type`、`sub_type`
|
||
- 编码:UTF-8
|
||
- 格式示例:
|
||
```json
|
||
{"category":"门票","type":"自然类","sub_type":"自然风光"}
|
||
{"category":"住宿","type":"商务酒店","sub_type":""}
|
||
```
|
||
|
||
### 12.2 输出文件格式
|
||
|
||
**分类结果JSONL(output/classification_result.jsonl)**:
|
||
- 每行一个JSON对象
|
||
- 包含字段:`product_id`、`category`、`type`、`sub_type`
|
||
- 编码:UTF-8
|
||
|
||
**分类结果CSV(output/classification_result.csv,可选)**:
|
||
- 包含Excel原始所有列 + 3个分类列
|
||
- 编码:UTF-8-BOM(确保Excel正确识别中文)
|
||
- 格式:标准CSV
|
||
|
||
---
|
||
|
||
## 附录:项目使用说明
|
||
|
||
### A.1 安装依赖
|
||
|
||
```bash
|
||
poetry install
|
||
```
|
||
|
||
### A.2 配置API Key
|
||
|
||
```bash
|
||
export DASHSCOPE_API_KEY="your_api_key"
|
||
```
|
||
|
||
### A.3 运行程序
|
||
|
||
```bash
|
||
# 使用Batch模式
|
||
python -m src.main --mode batch
|
||
|
||
# 使用Direct模式
|
||
python -m src.main --mode direct
|
||
|
||
# 指定配置文件
|
||
python -m src.main --config config/config.yaml
|
||
```
|
||
|
||
### A.4 运行测试
|
||
|
||
```bash
|
||
# 运行所有测试
|
||
poetry run pytest
|
||
|
||
# 运行特定测试
|
||
poetry run pytest tests/test_main.py
|
||
|
||
# 运行集成测试
|
||
poetry run pytest -m integration
|
||
```
|