# 产品分类模块架构设计 ## 1 模块概述 ### 1.1 功能描述 **功能**:读取产品列表Excel文件,使用通义千问LLM对每个产品进行自动分类。 **输入**: - 产品列表文件路径(Excel格式):包含产品编号、产品名称、景区名称等字段 - 产品类别配置文件路径(JSONL格式):包含类别、类型、子类型的分类体系 **输出**: - 分类结果文件(JSONL格式):包含产品编号及其对应的分类结果(类别、类型、子类型) - 分类结果文件(CSV格式,可选):包含原始产品数据及分类结果 ### 1.2 main.py接口 ```python def main(config: AppConfig) -> None: """ 产品分类主函数 Args: config: 应用配置对象,包含以下内容: - files.input_file: 产品列表Excel文件路径 - files.category_file: 产品类别配置JSONL文件路径 - files.output_file: 分类结果JSONL输出文件路径 - files.csv_output_file: 分类结果CSV输出文件路径(可选) - api.api_key: 通义千问API Key,若为None则从环境变量读取 - api.base_url: 通义千问API的base_url - api.model: 使用的模型名称 - api.mode: 调用模式(batch/direct) Returns: None Raises: FileNotFoundError: 输入文件不存在 ValueError: 文件格式错误或API调用失败 """ pass ``` ### 1.3 调用方式 命令行调用示例: ```bash # 使用配置文件 python -m src.main --config config/config.yaml # 指定调用模式 python -m src.main --mode batch # Batch批量模式 python -m src.main --mode direct # 直接请求模式 # 指定CSV输出 python -m src.main --csv-output-file output/result.csv ``` ## 2 架构设计 ### 2.1 项目结构 ``` src/ ├── core/ # 核心实现层 │ ├── __init__.py │ ├── file_handler.py # 文件处理实现 │ ├── batch_client.py # Batch模式API客户端 │ ├── direct_client.py # Direct模式API客户端 │ ├── client_factory.py # 客户端工厂 │ └── prompt_builder.py # Prompt构建实现 ├── models.py # 数据模型定义 ├── interfaces.py # 抽象接口定义 ├── config.py # 配置管理 └── main.py # 主程序入口 ``` ### 2.2 模块职责划分 #### 2.2.1 core/file_handler.py 文件处理的统一接口和各个专门的处理类: - **FileHandler** - 文件处理统一接口(组合类) - **ProductReader** - 产品列表读取实现 - **CategoryReader** - 类别配置读取实现 - **BatchFileWriter** - Batch请求文件写入实现 - **ResultParser** - 结果解析实现 - **ResultWriter** - 结果写入实现(JSONL) - **ResultMerger** - 结果合并实现 - **CSVWriter** - CSV文件写入实现 FileHandler通过组合方式,内部持有上述各专门处理类的实例,对外提供统一接口。 #### 2.2.2 core/batch_client.py与core/direct_client.py 负责与通义千问API交互,支持两种调用模式: **BatchClient(Batch批量模式)**: - 上传请求文件 - 创建Batch任务 - 轮询任务状态 - 下载结果文件 - 处理API错误 **DirectClient(直接请求模式)**: - 循环调用API进行实时分类 - 内置重试机制 - 错误处理 #### 2.2.3 core/client_factory.py 客户端工厂类,负责根据配置创建对应的API客户端: - 支持Batch和Direct两种模式 - 统一的创建接口 - 根据配置自动选择合适的客户端 #### 2.2.4 core/prompt_builder.py 负责构建LLM的prompt: - 根据类别配置生成分类指引 - 为每个产品构建分类请求prompt - 定义输出格式要求 #### 2.2.5 models.py 定义所有数据结构: - 输入/输出数据模型 - 配置数据模型 - 内部传递数据模型 #### 2.2.6 interfaces.py 定义抽象接口,实现依赖倒置: - 文件处理接口 - LLM客户端接口 - Prompt构建接口 #### 2.2.7 config.py 配置管理: - 读取配置文件 - 配置验证 - 环境变量支持 #### 2.2.8 main.py 主程序编排逻辑,调用各个模块完成任务 ### 2.3 依赖关系图 ```mermaid graph TD main[main.py] --> config[config.py] main --> interfaces[interfaces.py] main --> models[models.py] interfaces --> file_handler[core/file_handler.py] interfaces --> client_factory[core/client_factory.py] interfaces --> prompt_builder[core/prompt_builder.py] client_factory --> batch_client[core/batch_client.py] client_factory --> direct_client[core/direct_client.py] file_handler --> models batch_client --> models direct_client --> models prompt_builder --> models style main fill:#e1f5ff style interfaces fill:#fff4e1 style models fill:#f0f0f0 style config fill:#f0f0f0 ``` ### 2.4 整体业务流程 ```mermaid graph LR A[读取产品列表Excel] --> B[读取产品类别配置] B --> C[构建系统Prompt] C --> D[生成请求列表] D --> E{调用模式} E -->|Batch模式| F[上传→创建→等待→下载] E -->|Direct模式| G[循环调用API] F --> H[解析结果] G --> H H --> I[写入JSONL文件] I --> J{是否需要CSV?} J -->|是| K[合并原始数据] J -->|否| L[结束] K --> M[写入CSV文件] M --> L ``` ## 3 接口设计(interfaces.py) ### 3.1 接口类图 ```mermaid classDiagram %% 文件处理相关接口 class IFileHandler { <> +read_products(file_path) List~ProductInput~ +read_products_with_full_data(file_path) tuple +read_categories(file_path) List~ProductCategory~ +write_batch_requests(requests, output_path) str +parse_batch_responses(file_content) List~ClassificationResult~ +write_results(results, output_path) None +merge_and_write_csv(products_data, results, output_path) None } class IProductReader { <> +read(file_path) List~ProductInput~ +read_with_full_data(file_path) tuple } class ICategoryReader { <> +read(file_path) List~ProductCategory~ } class IBatchFileWriter { <> +write(requests, output_path) str } class IResultParser { <> +parse(file_content) List~ClassificationResult~ } class IResultWriter { <> +write(results, output_path) None } class IResultMerger { <> +merge(products_data, results) List~ProductWithClassification~ } class ICSVWriter { <> +write(data, output_path) None } IFileHandler o-- IProductReader IFileHandler o-- ICategoryReader IFileHandler o-- IBatchFileWriter IFileHandler o-- IResultParser IFileHandler o-- IResultWriter IFileHandler o-- IResultMerger IFileHandler o-- ICSVWriter %% LLM客户端相关接口 class ILLMClient { <> +classify_products(products, system_prompt, requests) List~ClassificationResult~ } class IBatchClient { <> +upload_file(file_path) str +create_batch(input_file_id, endpoint, completion_window) str +get_batch_status(batch_id) BatchTaskInfo +download_result(file_id) str +wait_for_completion(batch_id, poll_interval) BatchTaskInfo } %% Prompt构建接口 class IPromptBuilder { <> +build_system_prompt(categories) str +build_user_prompt(product) str +build_classification_request(product, system_prompt) dict } ``` ### 3.2 IFileHandler接口 文件处理统一接口,通过组合方式整合各个专门的文件处理类。 ```python class IFileHandler(ABC): """文件处理统一接口""" def __init__( self, product_reader: IProductReader, category_reader: ICategoryReader, batch_file_writer: IBatchFileWriter, result_parser: IResultParser, result_writer: IResultWriter, result_merger: IResultMerger, csv_writer: ICSVWriter ): """ 初始化文件处理器 Args: product_reader: 产品读取器 category_reader: 类别读取器 batch_file_writer: Batch文件写入器 result_parser: 结果解析器 result_writer: 结果写入器 result_merger: 结果合并器 csv_writer: CSV写入器 """ pass def read_products(self, file_path: str) -> List[ProductInput]: """ 读取产品列表 Args: file_path: Excel文件路径 Returns: List[ProductInput]: 产品列表 """ pass def read_products_with_full_data( self, file_path: str ) -> tuple[List[ProductInput], List[Dict[str, Any]]]: """ 读取产品列表和完整原始数据 Args: file_path: Excel文件路径 Returns: tuple: (产品列表, 完整原始数据) """ pass def read_categories(self, file_path: str) -> List[ProductCategory]: """ 读取类别配置 Args: file_path: JSONL文件路径 Returns: List[ProductCategory]: 类别列表 """ pass def write_batch_requests( self, requests: List[dict], output_path: str ) -> str: """ 写入Batch请求 Args: requests: Batch请求列表 output_path: 输出文件路径 Returns: str: 生成的文件路径 """ pass def parse_batch_responses(self, file_content: str) -> List[ClassificationResult]: """ 解析Batch响应 Args: file_content: JSONL文件内容字符串 Returns: List[ClassificationResult]: 分类结果列表 """ pass def write_results( self, results: List[ClassificationResult], output_path: str ) -> None: """ 写入分类结果 Args: results: 分类结果列表 output_path: 输出文件路径 """ pass def merge_and_write_csv( self, products_with_data: List[Dict[str, Any]], classification_results: List[ClassificationResult], output_path: str ) -> None: """ 合并数据并写入CSV文件 Args: products_with_data: 完整原始数据 classification_results: 分类结果 output_path: CSV输出路径 """ pass ``` ### 3.3 专门文件处理接口 #### 3.3.1 IProductReader接口 ```python class IProductReader(ABC): """产品列表读取接口""" @abstractmethod def read(self, file_path: str) -> List[ProductInput]: """ 读取产品列表Excel文件 Args: file_path: Excel文件路径 Returns: List[ProductInput]: 产品列表 Raises: FileNotFoundError: 文件不存在 ValueError: 文件格式错误或缺少必要列 """ pass @abstractmethod def read_with_full_data(self, file_path: str) -> tuple[List[ProductInput], List[Dict[str, Any]]]: """ 读取产品列表和完整原始数据 Args: file_path: Excel文件路径 Returns: tuple: (产品列表, 完整原始数据列表) """ pass ``` #### 3.3.2 ICategoryReader接口 ```python class ICategoryReader(ABC): """类别配置读取接口""" @abstractmethod def read(self, file_path: str) -> List[ProductCategory]: """ 读取产品类别配置JSONL文件 Args: file_path: JSONL文件路径 Returns: List[ProductCategory]: 类别列表 Raises: FileNotFoundError: 文件不存在 ValueError: JSON格式错误 """ pass ``` #### 3.3.3 IBatchFileWriter接口 ```python class IBatchFileWriter(ABC): """Batch请求文件写入接口""" @abstractmethod def write( self, requests: List[dict], output_path: str ) -> str: """ 将Batch请求写入JSONL文件 Args: requests: Batch请求列表 output_path: 输出文件路径 Returns: str: 生成的文件路径 """ pass ``` #### 3.3.4 IResultParser接口 ```python class IResultParser(ABC): """结果解析接口""" @abstractmethod def parse(self, file_content: str) -> List[ClassificationResult]: """ 解析Batch响应JSONL内容 Args: file_content: JSONL文件内容字符串 Returns: List[ClassificationResult]: 分类结果列表 Raises: ValueError: 解析错误 """ pass ``` #### 3.3.5 IResultWriter接口 ```python class IResultWriter(ABC): """结果写入接口""" @abstractmethod def write( self, results: List[ClassificationResult], output_path: str ) -> None: """ 写入分类结果到JSONL文件 Args: results: 分类结果列表 output_path: 输出文件路径 """ pass ``` #### 3.3.6 IResultMerger接口 ```python class IResultMerger(ABC): """结果合并接口""" @abstractmethod def merge( self, products_with_data: List[Dict[str, Any]], classification_results: List[ClassificationResult] ) -> List[ProductWithClassification]: """ 合并原始产品数据和分类结果 Args: products_with_data: 原始产品数据列表(包含所有Excel字段) classification_results: 分类结果列表 Returns: List[ProductWithClassification]: 合并后的完整数据列表 Raises: ValueError: 数据不匹配或缺少必要字段 """ pass ``` #### 3.3.7 ICSVWriter接口 ```python class ICSVWriter(ABC): """CSV写入接口""" @abstractmethod def write( self, data: List[ProductWithClassification], output_path: str ) -> None: """ 写入完整数据到CSV文件 Args: data: 完整产品数据列表 output_path: 输出文件路径 """ pass ``` ### 3.4 ILLMClient接口 LLM客户端统一接口,采用策略模式封装两种调用方式。 ```python class ILLMClient(ABC): """LLM客户端统一接口(策略模式)""" @abstractmethod def classify_products( self, products: List[ProductInput], system_prompt: str, requests: List[dict] ) -> List[ClassificationResult]: """ 对产品列表进行分类(统一接口) Args: products: 产品列表 system_prompt: 系统提示词 requests: 预构建的请求列表(Batch格式) Returns: List[ClassificationResult]: 分类结果列表 Raises: APIError: API调用失败 TimeoutError: 超时(仅Batch模式) """ pass ``` ### 3.5 IPromptBuilder接口 ```python class IPromptBuilder(ABC): """Prompt构建接口""" @abstractmethod def build_system_prompt(self, categories: List[ProductCategory]) -> str: """ 构建系统提示词 Args: categories: 产品类别列表 Returns: str: 系统提示词 """ pass @abstractmethod def build_user_prompt(self, product: ProductInput) -> str: """ 构建用户提示词 Args: product: 产品信息 Returns: str: 用户提示词 """ pass @abstractmethod def build_classification_request( self, product: ProductInput, system_prompt: str, model: str = "qwen-flash" ) -> dict: """ 构建完整的分类请求 Args: product: 产品信息 system_prompt: 系统提示词 model: 模型名称 Returns: dict: Batch请求格式的字典 """ pass ``` ## 4 数据模型(models.py) ### 4.1 输入数据模型 ```python from pydantic import BaseModel, Field from typing import Optional, Dict, Any, List class ProductInput(BaseModel): """产品输入数据""" product_id: str = Field(..., description="产品编号") product_name: str = Field(..., description="产品名称") scenic_spot: str = Field(..., description="景区名称") class ProductCategory(BaseModel): """产品类别定义""" category: str = Field(..., description="类别,如:门票、住宿、餐饮") type: str = Field(..., description="类型") sub_type: str = Field(default="", description="子类型") ``` ### 4.2 输出数据模型 ```python class ClassificationResult(BaseModel): """分类结果""" product_id: str = Field(..., description="产品编号") category: str = Field(..., description="类别") type: str = Field(..., description="类型") sub_type: str = Field(default="", description="子类型") class ProductWithClassification(BaseModel): """完整的产品数据(原始数据+分类结果)""" original_data: Dict[str, Any] = Field(..., description="原始Excel中的所有字段数据") category: str = Field(..., description="类别") type: str = Field(..., description="类型") sub_type: str = Field(default="", description="子类型") ``` ### 4.3 内部数据模型 ```python class BatchRequest(BaseModel): """Batch API请求格式""" custom_id: str = Field(..., description="自定义请求ID,对应产品编号") method: str = Field(default="POST", description="请求方法") url: str = Field(default="/v1/chat/completions", description="API路径") body: dict = Field(..., description="请求体") class BatchResponse(BaseModel): """Batch API响应格式""" id: str = Field(..., description="请求ID") custom_id: str = Field(..., description="自定义请求ID") response: dict = Field(..., description="响应内容") error: Optional[dict] = Field(default=None, description="错误信息") class BatchTaskInfo(BaseModel): """Batch任务信息""" task_id: str = Field(..., description="Batch任务ID") status: str = Field(..., description="任务状态") input_file_id: str = Field(..., description="输入文件ID") output_file_id: Optional[str] = Field(default=None, description="输出文件ID") error_file_id: Optional[str] = Field(default=None, description="错误文件ID") ``` ### 4.4 配置数据模型 ```python from enum import Enum class LLMClientMode(str, Enum): """LLM客户端模式""" BATCH = "batch" DIRECT = "direct" class ApiConfig(BaseModel): """API配置""" mode: LLMClientMode = Field( default=LLMClientMode.BATCH, description="调用模式:batch(批量)或direct(直接)" ) api_key: Optional[str] = Field(default=None, description="API密钥") base_url: str = Field(..., description="API基础URL") model: str = Field(..., description="模型名称") completion_window: str = Field(default="24h", description="Batch模式等待时间") max_retries: int = Field(default=3, description="Direct模式最大重试次数") retry_delay: int = Field(default=1, description="Direct模式重试延迟(秒)") class FilesConfig(BaseModel): """文件配置""" input_file: str = Field(..., description="产品列表Excel文件路径") category_file: str = Field(..., description="产品类别配置JSONL文件路径") output_file: str = Field(..., description="分类结果JSONL输出文件路径") csv_output_file: Optional[str] = Field( default=None, description="分类结果CSV输出文件路径(可选)" ) temp_request_file: str = Field( default="temp_batch_requests.jsonl", description="临时请求文件路径" ) class LoggingConfig(BaseModel): """日志配置""" level: str = Field(default="INFO", description="日志级别") format: str = Field( default="%(asctime)s - %(name)s - %(levelname)s - %(message)s", description="日志格式" ) file: str = Field(default="logs/classifier.log", description="日志文件路径") class BatchConfig(BaseModel): """Batch配置""" poll_interval: int = Field(default=60, description="轮询间隔(秒)") max_wait_time: int = Field(default=86400, description="最大等待时间(秒)") class AppConfig(BaseModel): """应用配置""" api: ApiConfig files: FilesConfig logging: LoggingConfig batch: BatchConfig ``` ## 5 核心实现(core/) ### 5.1 file_handler.py #### 5.1.1 ProductReader类 **功能**:读取Excel格式的产品列表 **算法流程**: ```mermaid graph TD Start([开始]) --> ReadExcel[使用pandas读取Excel] ReadExcel --> CheckColumns{检查必要列} CheckColumns -->|缺少| Error[抛出ValueError] CheckColumns -->|完整| ParseRows[遍历每一行] ParseRows --> CreateProduct[创建ProductInput对象] CreateProduct --> SaveFullData{是否需要完整数据?} SaveFullData -->|是| ToDict[转换为字典保存] SaveFullData -->|否| Skip[跳过] ToDict --> AddToList[添加到列表] Skip --> AddToList AddToList --> HasMore{还有行?} HasMore -->|是| ParseRows HasMore -->|否| Return([返回结果]) Error --> End([结束]) ``` **方法**: ```python def read(self, file_path: str) -> List[ProductInput]: """读取产品列表""" pass def read_with_full_data(self, file_path: str) -> tuple[List[ProductInput], List[Dict[str, Any]]]: """读取产品列表和完整原始数据""" pass ``` #### 5.1.2 ResultMerger类 **功能**:合并原始产品数据和分类结果 **算法流程**: ```mermaid graph TD Start([开始]) --> BuildDict[构建分类结果字典
key: product_id] BuildDict --> LoopProducts[遍历原始产品数据] LoopProducts --> GetID[获取产品编号] GetID --> LookupResult{查找分类结果} LookupResult -->|找到| Merge1[合并原始数据+分类] LookupResult -->|未找到| Merge2[原始数据+默认分类] Merge1 --> AddResult[添加到结果列表] Merge2 --> AddResult AddResult --> HasMore{还有产品?} HasMore -->|是| LoopProducts HasMore -->|否| Return([返回合并结果]) ``` #### 5.1.3 CSVWriter类 **功能**:将完整数据写入CSV文件 **算法流程**: ```mermaid graph TD Start([开始]) --> GetColumns[获取所有列名
原始列+分类列] GetColumns --> BuildRows[构建数据行] BuildRows --> LoopData[遍历每条数据] LoopData --> MergeRow[合并原始字段+分类字段] MergeRow --> AddRow[添加到行列表] AddRow --> HasMore{还有数据?} HasMore -->|是| LoopData HasMore -->|否| CreateDF[创建DataFrame] CreateDF --> WriteCSV[写入CSV文件
utf-8-sig编码] WriteCSV --> End([结束]) ``` ### 5.2 batch_client.py与direct_client.py #### 5.2.1 架构类图 ```mermaid classDiagram class ILLMClient { <> +classify_products(products, system_prompt, requests) List~ClassificationResult~ } class BatchClient { -_client: OpenAI -_logger: Logger +classify_products(...) List~ClassificationResult~ -_upload_file(file_path) str -_create_batch(...) str -_wait_for_completion(...) BatchTaskInfo -_download_result(file_id) str } class DirectClient { -_client: OpenAI -_model: str -_logger: Logger -_max_retries: int +classify_products(...) List~ClassificationResult~ -_classify_single_product(...) ClassificationResult -_parse_response(...) ClassificationResult } ILLMClient <|-- BatchClient : 实现 ILLMClient <|-- DirectClient : 实现 ``` #### 5.2.2 BatchClient算法流程 ```mermaid graph TD Start([开始]) --> WriteFile[写入请求文件] WriteFile --> Upload[上传文件] Upload --> CreateBatch[创建Batch任务] CreateBatch --> Poll[轮询任务状态] Poll --> CheckStatus{检查状态} CheckStatus -->|进行中| Wait[等待poll_interval] Wait --> Poll CheckStatus -->|completed| Download[下载结果文件] CheckStatus -->|failed| Error1[抛出错误] CheckStatus -->|expired| Error2[抛出超时错误] Download --> Parse[解析结果] Parse --> Cleanup[清理临时文件] Cleanup --> Return([返回分类结果]) Error1 --> End([结束]) Error2 --> End ``` #### 5.2.3 DirectClient算法流程 ```mermaid graph TD Start([开始]) --> InitResults[初始化结果列表] InitResults --> LoopProducts[遍历产品列表] LoopProducts --> CallAPI[调用chat.completions.create] CallAPI --> CheckResponse{调用成功?} CheckResponse -->|成功| ParseResponse[解析响应] CheckResponse -->|失败| CheckRetry{是否还能重试?} CheckRetry -->|是| Wait[等待retry_delay] Wait --> CallAPI CheckRetry -->|否| LogError[记录错误] LogError --> Skip[跳过该产品] ParseResponse --> AddResult[添加到结果列表] Skip --> HasMore{还有产品?} AddResult --> HasMore HasMore -->|是| LoopProducts HasMore -->|否| Return([返回分类结果]) ``` ### 5.3 client_factory.py #### 5.3.1 工厂模式类图 ```mermaid classDiagram class LLMClientFactory { <> +create_client(mode, ...) ILLMClient } class LLMClientMode { <> BATCH DIRECT } class ILLMClient { <> +classify_products(...) List~ClassificationResult~ } class BatchClient { +classify_products(...) List~ClassificationResult~ } class DirectClient { +classify_products(...) List~ClassificationResult~ } LLMClientFactory ..> LLMClientMode : 使用 LLMClientFactory ..> ILLMClient : 创建 LLMClientFactory ..> BatchClient : 创建 LLMClientFactory ..> DirectClient : 创建 ILLMClient <|-- BatchClient : 实现 ILLMClient <|-- DirectClient : 实现 ``` #### 5.3.2 工厂方法 ```python @staticmethod def create_client( mode: LLMClientMode, api_key: str, base_url: str, model: str, logger: logging.Logger, completion_window: str = "24h", max_retries: int = 3 ) -> ILLMClient: """ 根据模式创建对应的LLM客户端 Args: mode: 客户端模式(batch或direct) api_key: API密钥 base_url: API基础URL model: 模型名称 logger: 日志记录器 completion_window: Batch模式的等待时间(仅Batch模式使用) max_retries: 最大重试次数(仅Direct模式使用) Returns: ILLMClient: 对应模式的客户端实例 Raises: ValueError: 不支持的模式 """ pass ``` ### 5.4 prompt_builder.py #### 5.4.1 Prompt构建流程 ```mermaid graph TD Start([开始]) --> BuildSystem[构建系统Prompt] BuildSystem --> FormatCategories[格式化类别列表] FormatCategories --> AddInstructions[添加分类指令] AddInstructions --> AddOutputFormat[添加输出格式要求] AddOutputFormat --> SystemReady([系统Prompt完成]) Start2([为每个产品]) --> BuildUser[构建用户Prompt] BuildUser --> ExtractInfo[提取产品信息] ExtractInfo --> FormatProduct[格式化产品描述] FormatProduct --> UserReady([用户Prompt完成]) SystemReady --> Combine[组合系统+用户Prompt] UserReady --> Combine Combine --> BuildRequest[构建Batch请求格式] BuildRequest --> End([完成]) ``` #### 5.4.2 Prompt模板 **系统提示词模板**: ```python def build_system_prompt(self, categories: List[ProductCategory]) -> str: """ 你是一个产品分类专家。请根据以下产品类别体系对产品进行分类: {格式化的类别列表} 请严格按照以下JSON格式输出分类结果: { "category": "类别", "type": "类型", "sub_type": "子类型" } 分类原则: 1. 必须从提供的类别体系中选择最合适的分类 2. 如果没有合适的子类型,sub_type可以为空字符串 3. 只输出JSON格式,不要包含其他说明文字 """ pass ``` **用户提示词模板**: ```python def build_user_prompt(self, product: ProductInput) -> str: """ 请对以下产品进行分类: 产品编号: {product.product_id} 产品名称: {product.product_name} 景区名称: {product.scenic_spot} 请输出分类结果。 """ pass ``` ## 6 主程序逻辑(main.py) ### 6.1 主流程图 ```mermaid graph TD Start([开始]) --> Init[初始化配置和依赖] Init --> CreateFactory[使用工厂创建LLM客户端] CreateFactory --> CheckCSV{是否需要CSV输出?} CheckCSV -->|是| ReadFull[读取产品列表
含完整数据] CheckCSV -->|否| ReadSimple[读取产品列表] ReadFull --> ReadCategories ReadSimple --> ReadCategories[读取类别配置] ReadCategories --> BuildPrompt[构建系统Prompt] BuildPrompt --> GenRequests[生成请求列表] GenRequests --> Classify[调用客户端classify_products] Classify --> WriteJSONL[写入JSONL结果] WriteJSONL --> CheckCSV2{是否需要CSV?} CheckCSV2 -->|是| MergeData[合并原始数据和分类结果] CheckCSV2 -->|否| LogStats MergeData --> WriteCSV[写入CSV文件] WriteCSV --> LogStats[记录统计信息] LogStats --> End([结束]) ``` ### 6.2 main函数伪代码 ```python def main(config: AppConfig) -> None: """主函数""" # 1. 初始化日志 logger = setup_logger(config.logging) # 2. 获取API Key api_key = config.api.api_key or get_env_api_key() # 3. 初始化文件处理器 file_handler = FileHandler( product_reader=ProductReader(), category_reader=CategoryReader(), batch_file_writer=BatchFileWriter(), result_parser=ResultParser(), result_writer=ResultWriter(), result_merger=ResultMerger(), csv_writer=CSVWriter() ) # 4. 使用工厂创建LLM客户端 llm_client = LLMClientFactory.create_client( mode=config.api.mode, api_key=api_key, base_url=config.api.base_url, model=config.api.model, logger=logger, completion_window=config.api.completion_window, max_retries=config.api.max_retries ) # 5. 初始化Prompt构建器 prompt_builder = PromptBuilder() try: # 6. 读取输入数据 logger.info(f"读取产品列表: {config.files.input_file}") if config.files.csv_output_file: products, products_with_data = file_handler.read_products_with_full_data( config.files.input_file ) else: products = file_handler.read_products(config.files.input_file) products_with_data = None logger.info(f"共读取 {len(products)} 个产品") logger.info(f"读取类别配置: {config.files.category_file}") categories = file_handler.read_categories(config.files.category_file) logger.info(f"共读取 {len(categories)} 个类别") # 7. 构建Prompt和请求 logger.info("构建系统Prompt") system_prompt = prompt_builder.build_system_prompt(categories) logger.info("生成分类请求") requests = [] for product in products: request = prompt_builder.build_classification_request( product, system_prompt, config.api.model ) requests.append(request) logger.info(f"生成 {len(requests)} 个请求") # 8. 使用统一接口进行分类 logger.info(f"使用 {config.api.mode.value} 模式进行产品分类") results = llm_client.classify_products(products, system_prompt, requests) # 9. 写入JSONL输出文件 logger.info(f"写入分类结果: {config.files.output_file}") file_handler.write_results(results, config.files.output_file) # 10. 如果配置了CSV输出,则生成CSV文件 if config.files.csv_output_file and products_with_data: logger.info(f"生成CSV文件: {config.files.csv_output_file}") file_handler.merge_and_write_csv( products_with_data, results, config.files.csv_output_file ) logger.info("CSV文件生成完成") # 11. 统计信息 success_count = len(results) failed_count = len(products) - success_count logger.info(f"分类完成: 成功 {success_count}, 失败 {failed_count}") if failed_count > 0: logger.warning(f"有 {failed_count} 个产品分类失败") logger.info("="*60) logger.info("产品分类程序成功完成") logger.info("="*60) except FileNotFoundError as e: logger.error(f"文件不存在: {e}") raise except ValueError as e: logger.error(f"数据格式错误: {e}") raise except Exception as e: logger.error(f"处理过程中发生错误: {e}") raise ``` ### 6.3 函数调用关系图 ```mermaid graph TD main["main() --- 功能: 主程序入口 输入: AppConfig 输出: None"] setup_logger["setup_logger() --- 功能: 初始化日志系统 输入: LoggingConfig 输出: Logger"] create_client["LLMClientFactory.create_client() --- 功能: 创建LLM客户端 输入: mode, api_key等 输出: ILLMClient"] read_products["file_handler.read_products_with_full_data() --- 功能: 读取产品列表和完整数据 输入: file_path 输出: tuple"] read_categories["file_handler.read_categories() --- 功能: 读取类别配置 输入: file_path 输出: List~ProductCategory~"] build_system_prompt["prompt_builder.build_system_prompt() --- 功能: 构建系统Prompt 输入: List~ProductCategory~ 输出: str"] build_request["prompt_builder.build_classification_request() --- 功能: 构建分类请求 输入: ProductInput, system_prompt 输出: dict"] classify["llm_client.classify_products() --- 功能: 执行分类 输入: products, system_prompt, requests 输出: List~ClassificationResult~"] write_results["file_handler.write_results() --- 功能: 写入JSONL结果 输入: results, output_path 输出: None"] merge_csv["file_handler.merge_and_write_csv() --- 功能: 合并并写入CSV 输入: products_data, results, path 输出: None"] main --> setup_logger main --> create_client main --> read_products main --> read_categories main --> build_system_prompt main --> build_request main --> classify main --> write_results main --> merge_csv ``` ## 7 功能扩展 ### 7.1 直接请求模式扩展 #### 7.1.1 扩展背景 Batch模式响应较慢(需要上传文件、创建任务、轮询等待、下载结果),在某些场景下需要更快速的直接请求方式。为兼容两种调用方式,对现有架构进行最小化修改。 #### 7.1.2 两种调用方式对比 | 特性 | Batch模式 | 直接请求模式 | |------|----------|-------------| | 调用方式 | 文件批量处理 | 实时单次请求 | | 响应速度 | 慢(需等待调度) | 快(实时响应) | | 费用 | 实时调用的50% | 标准费用 | | 适用场景 | 大规模数据处理 | 小批量快速分类 | | 流程步骤 | 上传→创建→等待→下载 | 直接调用API | #### 7.1.3 架构修改方案 **核心修改点**: 1. **新增统一LLM客户端接口**(interfaces.py):使用策略模式封装两种调用方式 2. **重构BatchClient实现ILLMClient**(core/batch_client.py):将原有方法封装为统一接口 3. **新增DirectClient实现ILLMClient**(core/direct_client.py):实现直接请求模式 4. **新增客户端工厂**(core/client_factory.py):根据配置创建对应客户端 5. **修改main.py适配统一接口**:简化主流程,使用统一接口调用 **修改后的优点**: - 接口统一:对上层main.py提供一致的调用接口 - 易于扩展:可轻松添加新的调用模式 - 配置驱动:无需改代码即可切换模式 - 保持兼容:默认行为与原有系统一致 ### 7.2 CSV输出功能扩展 #### 7.2.1 扩展背景 当前系统输出的是JSONL格式的分类结果,仅包含产品编号和分类信息。实际业务需求是输出包含原始数据和分类结果的CSV文件,便于Excel打开和进一步分析。 #### 7.2.2 功能对比 | 特性 | 原有实现 | 扩展后实现 | |------|---------|-----------| | 输出格式 | JSONL | JSONL + CSV(可选) | | 输出内容 | 仅分类结果 | 原始数据 + 分类结果 | | 字段数量 | 4个(product_id + 3个分类) | Excel原始字段 + 3个分类 | | 适用场景 | 程序间数据交换 | 人工查看、Excel分析 | #### 7.2.3 架构扩展方案 **核心扩展点**: 1. **新增ProductWithClassification数据模型**(models.py):包含原始数据和分类结果 2. **新增IResultMerger接口**(interfaces.py):定义结果合并抽象 3. **实现ResultMerger**(core/file_handler.py):合并原始数据和分类结果 4. **修改ProductReader**(core/file_handler.py):支持读取完整数据 5. **新增ICSVWriter接口和实现**(core/file_handler.py):定义CSV写入功能 6. **扩展FileHandler统一接口**(core/file_handler.py):添加CSV相关方法 7. **修改main.py**(main.py):支持可选的CSV输出 **扩展后的优点**: - 业务价值高:直接输出可用的Excel文件 - 数据完整:保留所有原始信息 - 向后兼容:不影响现有功能,可选启用 - 易于使用:CSV格式Excel可直接打开 #### 7.2.4 输出文件示例 **JSONL输出**(classification_result.jsonl): ```json {"product_id":"9256359","category":"住宿","type":"商务酒店","sub_type":""} {"product_id":"9262851","category":"住宿","type":"度假酒店","sub_type":""} ``` **CSV输出**(classification_result.csv): ```csv 产品编号,产品名称,景区名称,价格,库存,上架时间,类别,类型,子类型 9256359,某某商务酒店,某景区,299,50,2024-01-01,住宿,商务酒店, 9262851,某某度假酒店,某景区,599,30,2024-01-02,住宿,度假酒店, ``` ## 8 测试设计 ### 8.1 测试文件结构 ``` tests/ ├── __init__.py ├── conftest.py # pytest配置 ├── fixtures/ # 测试数据 │ ├── create_test_data.py │ ├── sample_products.xlsx │ ├── sample_categories.jsonl │ └── sample_batch_response.jsonl ├── test_models.py # 测试数据模型 ├── test_config.py # 测试配置管理 ├── test_file_handler.py # 测试文件处理 ├── test_batch_client.py # 测试Batch客户端 ├── test_direct_client.py # 测试Direct客户端 ├── test_client_factory.py # 测试客户端工厂 ├── test_prompt_builder.py # 测试Prompt构建 └── test_main.py # 测试主流程 ``` ### 8.2 核心测试用例 #### 8.2.1 文件处理测试 ```python class TestFileHandler: """测试FileHandler组合类""" def test_read_products_success(self): """测试成功读取产品列表""" pass def test_read_products_with_full_data(self): """测试读取完整数据""" pass def test_read_categories_success(self): """测试成功读取类别配置""" pass def test_merge_and_write_csv(self): """测试合并数据并写入CSV""" pass def test_read_file_not_found(self): """测试读取不存在的文件""" pass def test_read_missing_columns(self): """测试缺少必要列的Excel""" pass ``` #### 8.2.2 LLM客户端测试 ```python class TestBatchClient: """测试BatchClient类""" def test_classify_products_success(self): """测试Batch模式分类成功""" pass def test_upload_file_success(self): """测试成功上传文件""" pass def test_wait_for_completion_timeout(self): """测试等待超时""" pass class TestDirectClient: """测试DirectClient类""" def test_classify_products_success(self): """测试Direct模式分类成功""" pass def test_classify_with_retry(self): """测试带重试的分类""" pass def test_parse_response(self): """测试解析响应""" pass class TestClientFactory: """测试LLMClientFactory类""" def test_create_batch_client(self): """测试创建Batch客户端""" pass def test_create_direct_client(self): """测试创建Direct客户端""" pass def test_invalid_mode(self): """测试无效的模式""" pass ``` #### 8.2.3 主流程测试 ```python class TestMain: """测试主流程""" def test_main_success_batch_mode(self): """测试Batch模式成功流程""" pass def test_main_success_direct_mode(self): """测试Direct模式成功流程""" pass def test_main_with_csv_output(self): """测试带CSV输出的流程""" pass def test_main_file_not_found(self): """测试输入文件不存在""" pass def test_main_api_error(self): """测试API调用失败""" pass ``` ### 8.3 集成测试 ```python class TestIntegration: """集成测试""" @pytest.mark.integration def test_end_to_end_batch_mode(self): """Batch模式端到端测试""" pass @pytest.mark.integration def test_end_to_end_direct_mode(self): """Direct模式端到端测试""" pass @pytest.mark.integration def test_end_to_end_with_csv(self): """带CSV输出的端到端测试""" pass ``` ## 9 配置管理 ### 9.1 配置文件(config.yaml) ```yaml # API配置 api: mode: "batch" # 调用模式:batch(批量模式) 或 direct(直接模式) api_key: null # API密钥,若为null则从环境变量DASHSCOPE_API_KEY读取 base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1" model: "qwen-flash" completion_window: "24h" # Batch模式的等待时间 max_retries: 3 # Direct模式的最大重试次数 retry_delay: 1 # Direct模式的重试延迟(秒) # 文件路径配置 files: input_file: "input/product_list.xlsx" category_file: "config/product_type.jsonl" output_file: "output/classification_result.jsonl" csv_output_file: "output/classification_result.csv" # CSV输出(可选) temp_request_file: "temp_batch_requests.jsonl" # 日志配置 logging: level: "INFO" format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" file: "logs/classifier.log" # Batch配置 batch: poll_interval: 60 # 轮询间隔(秒) max_wait_time: 86400 # 最大等待时间(秒,24小时) ``` ### 9.2 环境变量 | 变量名 | 说明 | 是否必需 | 默认值 | |--------|------|----------|--------| | DASHSCOPE_API_KEY | 通义千问API Key | 是 | 无 | ### 9.3 命令行参数 ```bash python -m src.main \ --config config/config.yaml \ --mode direct \ --input-file input/product_list.xlsx \ --output-file output/result.jsonl \ --csv-output-file output/result.csv ``` ## 10 错误处理 ### 10.1 错误分类 | 错误类型 | 处理策略 | 日志级别 | |---------|---------|---------| | 文件不存在 | 立即终止,提示用户 | ERROR | | 文件格式错误 | 立即终止,提示具体错误 | ERROR | | API认证失败 | 立即终止,检查API Key | ERROR | | 网络错误 | 重试3次,失败后终止 | WARNING → ERROR | | Batch任务失败 | 记录错误,保存错误文件 | ERROR | | 部分产品分类失败 | 继续处理,记录失败项 | WARNING | ### 10.2 重试机制 - **API调用失败**:最多重试3次,指数退避(1s, 2s, 4s) - **文件下载失败**:最多重试3次 - **任务状态查询**:持续轮询直到完成或超时 ### 10.3 日志记录 所有错误信息应记录到日志文件(logs/目录): - **INFO级别**:正常流程信息 - **WARNING级别**:警告信息(如部分产品失败) - **ERROR级别**:错误信息(如文件不存在、API失败) ## 11 依赖项 ### 11.1 Python版本 - Python >= 3.9 ### 11.2 第三方库 - openai:通义千问API调用 - pandas:Excel文件读取和CSV写入 - pydantic:数据模型验证 - openpyxl:Excel文件解析 - pyyaml:配置文件解析 - pytest:单元测试框架 ### 11.3 依赖管理 使用Poetry进行依赖管理,配置文件为`pyproject.toml`。 ## 12 输入输出格式说明 ### 12.1 输入文件格式 **产品列表Excel(input/product_list.xlsx)**: - 必须包含列:`产品编号`、`产品名称`、`景区名称` - 编码:UTF-8 - 格式:.xlsx **产品类别配置JSONL(config/product_type.jsonl)**: - 每行一个JSON对象 - 必须包含字段:`category`、`type`、`sub_type` - 编码:UTF-8 - 格式示例: ```json {"category":"门票","type":"自然类","sub_type":"自然风光"} {"category":"住宿","type":"商务酒店","sub_type":""} ``` ### 12.2 输出文件格式 **分类结果JSONL(output/classification_result.jsonl)**: - 每行一个JSON对象 - 包含字段:`product_id`、`category`、`type`、`sub_type` - 编码:UTF-8 **分类结果CSV(output/classification_result.csv,可选)**: - 包含Excel原始所有列 + 3个分类列 - 编码:UTF-8-BOM(确保Excel正确识别中文) - 格式:标准CSV --- ## 附录:项目使用说明 ### A.1 安装依赖 ```bash poetry install ``` ### A.2 配置API Key ```bash export DASHSCOPE_API_KEY="your_api_key" ``` ### A.3 运行程序 ```bash # 使用Batch模式 python -m src.main --mode batch # 使用Direct模式 python -m src.main --mode direct # 指定配置文件 python -m src.main --config config/config.yaml ``` ### A.4 运行测试 ```bash # 运行所有测试 poetry run pytest # 运行特定测试 poetry run pytest tests/test_main.py # 运行集成测试 poetry run pytest -m integration ```