# TravelContentCreator 可维护架构设计 V2 > 设计目标:**功能不变 + 易维护 + 易扩展** --- ## 一、核心设计原则 ### 1.1 功能不变保证 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 功能兼容性保证策略 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. API 接口签名不变 │ │ - 所有现有 endpoint 路径保持不变 │ │ - 请求/响应格式保持兼容 │ │ - 新功能通过新 endpoint 或可选参数添加 │ │ │ │ 2. 内部重构采用 "Strangler Fig" 模式 │ │ - 新代码包裹旧代码 │ │ - 逐步替换,而非一次性重写 │ │ - 每次修改都可独立测试和回滚 │ │ │ │ 3. 数据库零迁移 │ │ - 不改表结构 │ │ - Repository 只是代码层抽象 │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 1.2 可维护性设计原则 | 原则 | 具体做法 | 收益 | |-----|---------|------| | **单一职责** | 每个类/模块只做一件事 | 改一处不影响其他 | | **依赖倒置** | 依赖抽象接口,不依赖具体实现 | 可替换、可测试 | | **开闭原则** | 对扩展开放,对修改关闭 | 新增功能不改旧代码 | | **显式依赖** | 所有依赖通过构造函数注入 | 一眼看清依赖关系 | --- ## 二、分层架构详解 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ 架构分层图 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Layer 1: API Layer (接口层) │ │ │ │ │ │ │ │ 职责:HTTP 请求处理、参数校验、响应格式化 │ │ │ │ 规则:不包含任何业务逻辑,只做"转发" │ │ │ │ │ │ │ │ api/routers/ │ │ │ │ ├── poster.py → 海报相关 endpoint │ │ │ │ ├── tweet.py → 内容生成 endpoint │ │ │ │ ├── data.py → 基础数据 endpoint │ │ │ │ └── integration.py → 整合服务 endpoint │ │ │ │ │ │ │ │ 修改场景:新增 API、修改参数校验、调整响应格式 │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Layer 2: Application Layer (应用服务层) │ │ │ │ │ │ │ │ 职责:业务流程编排、调用领域层、事务管理 │ │ │ │ 规则:协调多个领域服务,不包含具体算法实现 │ │ │ │ │ │ │ │ api/services/ │ │ │ │ ├── poster_service.py → 海报生成流程编排 (精简后 <200行) │ │ │ │ ├── content_service.py → 内容生成流程编排 (精简后 <200行) │ │ │ │ └── data_service.py → 数据查询流程编排 │ │ │ │ │ │ │ │ 修改场景:调整业务流程、新增业务规则、修改调用顺序 │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Layer 3: Domain Layer (领域层) │ │ │ │ │ │ │ │ 职责:核心业务逻辑、算法实现 │ │ │ │ 规则:纯业务逻辑,不依赖框架、不依赖数据库 │ │ │ │ │ │ │ │ domain/ │ │ │ │ ├── poster/ → 海报生成领域 │ │ │ │ │ ├── engines/ → 各种引擎 (渲染、导出等) │ │ │ │ │ ├── templates/ → 模板实现 │ │ │ │ │ └── models/ → 领域模型 │ │ │ │ ├── content/ → 内容生成领域 │ │ │ │ │ ├── engines/ → 选题、内容、审核引擎 │ │ │ │ │ └── models/ → 领域模型 │ │ │ │ └── spider/ → 爬虫领域 │ │ │ │ │ │ │ │ 修改场景:新增模板、修改算法、新增生成引擎 │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Layer 4: Infrastructure Layer (基础设施层) │ │ │ │ │ │ │ │ 职责:技术实现细节 (数据库、外部API、文件存储) │ │ │ │ 规则:可替换的技术实现,通过接口与上层解耦 │ │ │ │ │ │ │ │ infrastructure/ │ │ │ │ ├── database/ → 数据库访问 │ │ │ │ │ ├── connection.py → 连接池管理 │ │ │ │ │ └── repositories/ → 各表的 Repository │ │ │ │ ├── ai/ → AI 服务 │ │ │ │ │ └── llm_client.py → LLM 调用封装 │ │ │ │ ├── storage/ → 存储服务 │ │ │ │ │ └── file_storage.py → 文件存储 │ │ │ │ └── config/ → 配置管理 │ │ │ │ │ │ │ │ 修改场景:换数据库、换AI提供商、换存储方式 │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` --- ## 三、二次开发扩展点设计 ### 3.1 新增海报模板 (最常见场景) **现状问题**:需要修改 `poster.py` 的 3000+ 行代码 **重构后**:只需 3 步 ```python # Step 1: 创建新模板文件 # domain/poster/templates/new_style/renderer.py from domain.poster.templates.base import BaseTemplate class NewStyleTemplate(BaseTemplate): """新风格模板""" template_id = "new_style" template_name = "新风格" def render(self, images: List[Image], content: Dict) -> Image: """实现渲染逻辑""" # 你的渲染代码 pass def to_fabric_json(self, images: List[Image], content: Dict) -> Dict: """实现 Fabric.js JSON 导出""" # 你的导出代码 pass ``` ```python # Step 2: 注册模板 (可选,支持自动发现) # domain/poster/templates/__init__.py from .new_style.renderer import NewStyleTemplate # 模板会被自动注册到 TemplateRegistry ``` ```sql -- Step 3: 数据库添加记录 (可选,也可以用默认配置) INSERT INTO poster_template (id, name, handler_path, class_name, is_active) VALUES ('new_style', '新风格', 'domain.poster.templates.new_style.renderer', 'NewStyleTemplate', 1); ``` **扩展点设计**: ```python # domain/poster/templates/base.py from abc import ABC, abstractmethod from typing import Dict, List, Optional from PIL import Image class BaseTemplate(ABC): """模板基类 - 定义扩展点""" # ========== 必须实现的方法 ========== @abstractmethod def render(self, images: List[Image], content: Dict) -> Image: """渲染 PNG 图片 (必须实现)""" pass # ========== 可选实现的方法 (有默认实现) ========== def to_fabric_json(self, images: List[Image], content: Dict) -> Optional[Dict]: """导出 Fabric.js JSON (可选)""" return None # 默认不支持 def to_psd(self, images: List[Image], content: Dict) -> Optional[bytes]: """导出 PSD 文件 (可选)""" return None # 默认不支持 def get_decoration_images(self, content: Dict) -> List[Dict]: """获取装饰图片 (可选)""" return [] # 默认无装饰图 # ========== 工具方法 (子类可直接使用) ========== def resize_image(self, image: Image, size: tuple) -> Image: """调整图片尺寸""" return self.image_processor.resize(image, size) def apply_glass_effect(self, image: Image, intensity: float) -> Image: """应用毛玻璃效果""" return self.effect_engine.glass_blur(image, intensity) ``` ### 3.2 新增内容生成类型 **扩展点设计**: ```python # domain/content/engines/base.py from abc import ABC, abstractmethod class BaseContentEngine(ABC): """内容生成引擎基类""" @abstractmethod async def generate(self, params: Dict) -> Dict: """生成内容""" pass def get_prompt_template(self) -> str: """获取提示词模板路径 (可覆盖)""" return f"resource/prompt/{self.engine_type}/user.txt" # 新增引擎示例 class VideoScriptEngine(BaseContentEngine): """视频脚本生成引擎""" engine_type = "video_script" async def generate(self, params: Dict) -> Dict: prompt = self.build_prompt(params) result = await self.llm_client.generate(prompt) return self.parse_result(result) ``` ### 3.3 新增数据源 **扩展点设计**: ```python # infrastructure/database/repositories/base.py from abc import ABC, abstractmethod from typing import List, Optional, Dict class BaseRepository(ABC): """Repository 基类""" def __init__(self, db_pool): self.db_pool = db_pool @abstractmethod async def find_by_id(self, id: int) -> Optional[Dict]: pass @abstractmethod async def find_all(self, **filters) -> List[Dict]: pass # 新增 Repository 示例 # infrastructure/database/repositories/hotel_repo.py class HotelRepository(BaseRepository): """酒店数据 Repository""" table_name = "hotel" async def find_by_id(self, id: int) -> Optional[Dict]: query = f"SELECT * FROM {self.table_name} WHERE id = %s" return await self.db_pool.fetchone(query, (id,)) async def find_by_city(self, city: str) -> List[Dict]: query = f"SELECT * FROM {self.table_name} WHERE city = %s" return await self.db_pool.fetchall(query, (city,)) ``` --- ## 四、目录结构设计 ``` TravelContentCreator/ ├── api/ # Layer 1: API 层 │ ├── __init__.py │ ├── main.py # FastAPI 应用入口 │ ├── middleware/ # 中间件 │ │ ├── __init__.py │ │ ├── exception_handler.py # 统一异常处理 │ │ └── logging.py # 请求日志 │ ├── routers/ # 路由定义 (保持现有结构) │ │ ├── poster.py │ │ ├── tweet.py │ │ ├── data.py │ │ └── integration.py │ ├── models/ # API 请求/响应模型 │ │ ├── request.py │ │ └── response.py │ └── services/ # Layer 2: 应用服务层 (精简后) │ ├── __init__.py │ ├── poster_service.py # <200 行,只做流程编排 │ ├── content_service.py # <200 行 │ └── data_service.py # <200 行 │ ├── domain/ # Layer 3: 领域层 (新增) │ ├── __init__.py │ ├── poster/ # 海报生成领域 │ │ ├── __init__.py │ │ ├── engines/ # 引擎 │ │ │ ├── __init__.py │ │ │ ├── render_engine.py # PNG 渲染 │ │ │ ├── fabric_engine.py # Fabric.js JSON │ │ │ ├── psd_engine.py # PSD 导出 │ │ │ └── image_engine.py # 图片处理 │ │ ├── templates/ # 模板 (从 poster/templates 迁移) │ │ │ ├── __init__.py │ │ │ ├── base.py # 模板基类 │ │ │ ├── registry.py # 模板注册表 │ │ │ ├── vibrant/ # 活力风格 │ │ │ │ ├── __init__.py │ │ │ │ ├── renderer.py │ │ │ │ ├── layout.py │ │ │ │ └── effects.py │ │ │ └── business/ # 商务风格 │ │ │ └── ... │ │ └── models/ # 领域模型 │ │ ├── __init__.py │ │ ├── poster_content.py │ │ └── render_result.py │ │ │ ├── content/ # 内容生成领域 │ │ ├── __init__.py │ │ ├── engines/ │ │ │ ├── __init__.py │ │ │ ├── topic_engine.py # 选题生成 │ │ │ ├── content_engine.py # 内容生成 │ │ │ ├── judge_engine.py # 内容审核 │ │ │ └── prompt_engine.py # 提示词构建 │ │ └── models/ │ │ ├── topic.py │ │ └── article.py │ │ │ └── spider/ # 爬虫领域 │ ├── __init__.py │ ├── xhs_crawler.py │ └── cookie_pool.py │ ├── infrastructure/ # Layer 4: 基础设施层 (新增) │ ├── __init__.py │ ├── database/ │ │ ├── __init__.py │ │ ├── connection.py # 连接池 (异步) │ │ └── repositories/ # 按表拆分 │ │ ├── __init__.py │ │ ├── base.py │ │ ├── scenic_spot_repo.py │ │ ├── product_repo.py │ │ ├── style_repo.py │ │ ├── audience_repo.py │ │ └── template_repo.py │ ├── ai/ │ │ ├── __init__.py │ │ ├── llm_client.py # LLM 客户端抽象 │ │ └── openai_impl.py # OpenAI 实现 │ ├── storage/ │ │ ├── __init__.py │ │ └── file_storage.py │ └── config/ │ ├── __init__.py │ └── settings.py │ ├── core/ # 保留现有核心模块 (逐步迁移) │ ├── ai/ # → 迁移到 infrastructure/ai │ ├── config/ # → 迁移到 infrastructure/config │ ├── document/ # → 迁移到 domain/document │ └── xhs_spider/ # → 迁移到 domain/spider │ ├── poster/ # 保留现有海报模块 (逐步迁移) │ └── templates/ # → 迁移到 domain/poster/templates │ ├── tweet/ # 保留现有内容模块 (逐步迁移) │ ├── topic_generator.py # → 迁移到 domain/content/engines │ ├── content_generator.py # → 迁移到 domain/content/engines │ └── content_judger.py # → 迁移到 domain/content/engines │ ├── utils/ # 工具层 (保留) │ ├── file_io.py │ ├── image_processor.py │ └── prompts.py │ ├── resource/ # 资源文件 (保留) │ └── prompt/ │ ├── config/ # 配置文件 (保留) │ └── tests/ # 测试 (增强) ├── unit/ # 单元测试 │ ├── domain/ │ └── infrastructure/ ├── integration/ # 集成测试 └── api/ # API 测试 ``` --- ## 五、渐进式迁移策略 ### 5.1 迁移顺序 ``` Phase 1: 基础设施层 (风险最低) ├── 1.1 创建 infrastructure/database/connection.py (异步连接池) ├── 1.2 创建 infrastructure/database/repositories/ (按表拆分) ├── 1.3 旧 DatabaseService 调用新 Repository (适配器模式) └── 1.4 验证:所有现有功能正常 Phase 2: 领域层抽取 (核心) ├── 2.1 创建 domain/poster/engines/ (从 poster.py 抽取) ├── 2.2 创建 domain/poster/templates/ (迁移现有模板) ├── 2.3 创建 domain/content/engines/ (从 tweet/ 迁移) └── 2.4 验证:所有现有功能正常 Phase 3: 应用服务层精简 ├── 3.1 精简 api/services/poster_service.py ├── 3.2 精简 api/services/content_service.py (原 tweet.py) └── 3.3 验证:所有现有功能正常 Phase 4: 清理旧代码 ├── 4.1 删除旧的 DatabaseService 中已迁移的代码 ├── 4.2 删除旧的 poster.py 中已迁移的代码 └── 4.3 更新 import 路径 ``` ### 5.2 每个 Phase 的验证清单 ```yaml 验证清单: API 测试: - [ ] POST /poster/generate 正常 - [ ] POST /tweet/topic 正常 - [ ] POST /tweet/content 正常 - [ ] GET /data/scenic-spots 正常 - [ ] GET /data/products 正常 功能测试: - [ ] 海报生成 PNG 正常 - [ ] 海报生成 Fabric.js JSON 正常 - [ ] 海报生成 PSD 正常 - [ ] 选题生成正常 - [ ] 内容生成正常 - [ ] 内容审核正常 性能测试: - [ ] 响应时间无明显退化 - [ ] 内存使用无明显增加 ``` --- ## 六、关键代码示例 ### 6.1 精简后的 PosterService ```python # api/services/poster_service.py (精简后 ~150 行) from typing import Dict, Any, Optional, List from domain.poster.engines import RenderEngine, FabricEngine, PSDEngine, ImageEngine from domain.poster.templates import TemplateRegistry from infrastructure.database.repositories import TemplateRepository from infrastructure.ai import LLMClient class PosterService: """海报服务 - 只做流程编排""" def __init__( self, template_repo: TemplateRepository, render_engine: RenderEngine, fabric_engine: FabricEngine, psd_engine: PSDEngine, image_engine: ImageEngine, llm_client: LLMClient ): self.template_repo = template_repo self.render_engine = render_engine self.fabric_engine = fabric_engine self.psd_engine = psd_engine self.image_engine = image_engine self.llm_client = llm_client self.template_registry = TemplateRegistry() async def generate_poster( self, template_id: str, images_base64: List[str], content: Optional[Dict] = None, generate_psd: bool = False, generate_fabric_json: bool = False ) -> Dict[str, Any]: """生成海报 - 编排各个引擎""" # 1. 获取模板 template = self.template_registry.get(template_id) if not template: raise ValueError(f"模板不存在: {template_id}") # 2. 处理图片 images = [self.image_engine.decode_base64(img) for img in images_base64] # 3. 生成内容 (如果需要) if not content: content = await self._generate_content_with_llm(template_id) # 4. 渲染 PNG png_result = self.render_engine.render(template, images, content) # 5. 可选:生成 Fabric.js JSON fabric_json = None if generate_fabric_json: fabric_json = self.fabric_engine.export(template, images, content) # 6. 可选:生成 PSD psd_data = None if generate_psd: psd_data = self.psd_engine.export(template, images, content) # 7. 返回结果 return { "success": True, "image_base64": self.image_engine.to_base64(png_result), "fabric_json": fabric_json, "psd_base64": psd_data, "template_id": template_id } async def _generate_content_with_llm(self, template_id: str) -> Dict: """使用 LLM 生成内容""" # 调用 LLM 客户端 pass ``` ### 6.2 模板注册表 ```python # domain/poster/templates/registry.py from typing import Dict, Type, Optional from .base import BaseTemplate import importlib import logging logger = logging.getLogger(__name__) class TemplateRegistry: """模板注册表 - 支持动态注册和自动发现""" _instance = None _templates: Dict[str, Type[BaseTemplate]] = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._auto_discover() return cls._instance def _auto_discover(self): """自动发现并注册模板""" # 扫描 templates 目录下的所有模板 template_modules = [ 'domain.poster.templates.vibrant.renderer', 'domain.poster.templates.business.renderer', # 新模板会自动被发现 ] for module_path in template_modules: try: module = importlib.import_module(module_path) for attr_name in dir(module): attr = getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, BaseTemplate) and attr is not BaseTemplate): self.register(attr) except Exception as e: logger.warning(f"加载模板模块失败: {module_path}, {e}") def register(self, template_class: Type[BaseTemplate]): """注册模板""" template_id = getattr(template_class, 'template_id', template_class.__name__) self._templates[template_id] = template_class logger.info(f"注册模板: {template_id}") def get(self, template_id: str) -> Optional[BaseTemplate]: """获取模板实例""" template_class = self._templates.get(template_id) if template_class: return template_class() return None def list_all(self) -> Dict[str, dict]: """列出所有可用模板""" return { tid: { 'name': getattr(cls, 'template_name', tid), 'description': getattr(cls, 'description', '') } for tid, cls in self._templates.items() } ``` ### 6.3 Repository 示例 ```python # infrastructure/database/repositories/scenic_spot_repo.py from typing import List, Optional, Dict from .base import BaseRepository class ScenicSpotRepository(BaseRepository): """景区数据 Repository""" table_name = "scenic_spot" async def find_by_id(self, id: int) -> Optional[Dict]: query = f"SELECT * FROM {self.table_name} WHERE id = %s" return await self.db_pool.fetchone(query, (id,)) async def find_by_name(self, name: str) -> Optional[Dict]: query = f"SELECT * FROM {self.table_name} WHERE name = %s" return await self.db_pool.fetchone(query, (name,)) async def find_all_active(self) -> List[Dict]: query = f"SELECT * FROM {self.table_name} WHERE is_active = 1" return await self.db_pool.fetchall(query) async def search(self, keyword: str, limit: int = 20) -> List[Dict]: query = f""" SELECT * FROM {self.table_name} WHERE name LIKE %s OR description LIKE %s LIMIT %s """ pattern = f"%{keyword}%" return await self.db_pool.fetchall(query, (pattern, pattern, limit)) ``` --- ## 七、常见二次开发场景速查 | 场景 | 修改位置 | 影响范围 | |-----|---------|---------| | 新增海报模板 | `domain/poster/templates/新模板/` | 仅新模板 | | 修改现有模板渲染 | `domain/poster/templates/xxx/renderer.py` | 仅该模板 | | 新增内容生成类型 | `domain/content/engines/新引擎.py` | 仅新引擎 | | 修改 LLM 提示词 | `resource/prompt/xxx/` | 仅该提示词 | | 新增数据表查询 | `infrastructure/database/repositories/新repo.py` | 仅新 repo | | 修改 API 参数 | `api/routers/xxx.py` + `api/models/` | 仅该 API | | 新增 API 接口 | `api/routers/xxx.py` | 仅新接口 | | 换 AI 提供商 | `infrastructure/ai/新实现.py` | 仅 AI 层 | --- ## 八、测试策略 ```python # tests/unit/domain/poster/test_vibrant_template.py import pytest from domain.poster.templates.vibrant.renderer import VibrantTemplate from PIL import Image class TestVibrantTemplate: """Vibrant 模板单元测试""" @pytest.fixture def template(self): return VibrantTemplate() @pytest.fixture def sample_image(self): return Image.new('RGB', (1350, 1800), color='blue') @pytest.fixture def sample_content(self): return { "title": "测试标题", "slogan": "测试副标题", "price": "199", "content_items": ["项目1", "项目2"] } def test_render_returns_image(self, template, sample_image, sample_content): """测试渲染返回图片""" result = template.render([sample_image], sample_content) assert isinstance(result, Image.Image) assert result.size == (1350, 1800) def test_to_fabric_json_returns_dict(self, template, sample_image, sample_content): """测试 Fabric.js 导出""" result = template.to_fabric_json([sample_image], sample_content) assert isinstance(result, dict) assert "objects" in result ``` --- ## 九、面向 AIGC 扩展的插件化架构 ### 9.1 未来可能的 AIGC 功能 | 功能 | 描述 | 输入 | 输出 | |-----|------|------|------| | 视频脚本生成 | 根据景点生成短视频脚本 | 景点信息、风格 | 分镜脚本 JSON | | AI 配音 | 文字转语音 | 文本、音色 | 音频文件 | | 智能抠图 | 移除图片背景 | 图片 | 透明背景图 | | 图片风格迁移 | 将图片转换为特定风格 | 图片、风格 | 风格化图片 | | AI 视频生成 | 图片生成视频 | 图片序列、音频 | 视频文件 | | 智能排版 | 自动调整海报布局 | 内容、约束 | 布局方案 | | 多语言翻译 | 内容多语言适配 | 文本、目标语言 | 翻译文本 | ### 9.2 插件化引擎架构 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ AIGC 插件化架构 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ 统一任务入口 (Python) │ │ │ │ │ │ │ │ POST /api/v2/aigc/execute │ │ │ │ { │ │ │ │ "engine": "poster_generate", // 引擎类型 │ │ │ │ "params": {...}, // 引擎参数 │ │ │ │ "async": true // 是否异步 │ │ │ │ } │ │ │ │ │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ 引擎注册表 (EngineRegistry) │ │ │ │ │ │ │ │ engines = { │ │ │ │ "poster_generate": PosterGenerateEngine, │ │ │ │ "content_generate": ContentGenerateEngine, │ │ │ │ "topic_generate": TopicGenerateEngine, │ │ │ │ "content_judge": ContentJudgeEngine, │ │ │ │ "video_script": VideoScriptEngine, # 未来新增 │ │ │ │ "ai_voice": AIVoiceEngine, # 未来新增 │ │ │ │ "image_style": ImageStyleEngine, # 未来新增 │ │ │ │ } │ │ │ │ │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ 引擎基类 (BaseAIGCEngine) │ │ │ │ │ │ │ │ class BaseAIGCEngine(ABC): │ │ │ │ engine_id: str # 引擎唯一标识 │ │ │ │ engine_name: str # 引擎显示名称 │ │ │ │ version: str # 版本号 │ │ │ │ │ │ │ │ @abstractmethod │ │ │ │ async def execute(params: Dict) -> EngineResult │ │ │ │ │ │ │ │ @abstractmethod │ │ │ │ def get_param_schema() -> Dict # 参数 JSON Schema │ │ │ │ │ │ │ │ def validate_params(params) -> bool │ │ │ │ def estimate_duration(params) -> int # 预估耗时(秒) │ │ │ │ def get_progress(task_id) -> int # 获取进度(0-100) │ │ │ │ │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ 具体引擎实现 │ │ │ │ │ │ │ │ domain/aigc/engines/ │ │ │ │ ├── poster_generate.py # 海报生成引擎 │ │ │ │ ├── content_generate.py # 内容生成引擎 │ │ │ │ ├── topic_generate.py # 选题生成引擎 │ │ │ │ ├── content_judge.py # 内容审核引擎 │ │ │ │ ├── video_script.py # 视频脚本引擎 (未来) │ │ │ │ └── ai_voice.py # AI配音引擎 (未来) │ │ │ │ │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### 9.3 新增 AIGC 功能的标准流程 **只需 3 步,无需修改现有代码:** ```python # Step 1: 创建引擎文件 # domain/aigc/engines/video_script.py from .base import BaseAIGCEngine, EngineResult class VideoScriptEngine(BaseAIGCEngine): """视频脚本生成引擎""" engine_id = "video_script" engine_name = "视频脚本生成" version = "1.0.0" def get_param_schema(self) -> Dict: """定义参数结构""" return { "type": "object", "required": ["scenic_spot_id", "duration"], "properties": { "scenic_spot_id": {"type": "integer", "description": "景点ID"}, "duration": {"type": "integer", "description": "视频时长(秒)"}, "style": {"type": "string", "enum": ["vlog", "documentary", "promo"]} } } async def execute(self, params: Dict) -> EngineResult: """执行生成""" # 1. 获取景点信息 spot = await self.scenic_spot_repo.find_by_id(params["scenic_spot_id"]) # 2. 构建提示词 prompt = self.prompt_engine.build("video_script", spot, params) # 3. 调用 LLM result = await self.llm_client.generate(prompt) # 4. 解析结果 script = self.parse_script(result) return EngineResult( success=True, data={"script": script, "scenes": len(script["scenes"])} ) def estimate_duration(self, params: Dict) -> int: """预估耗时""" return 30 # 约30秒 ``` ```python # Step 2: 注册引擎 (自动发现,或手动注册) # domain/aigc/engines/__init__.py from .video_script import VideoScriptEngine # 引擎会被自动注册到 EngineRegistry ``` ```python # Step 3: (可选) 添加提示词模板 # resource/prompt/video_script/system.txt # resource/prompt/video_script/user.txt ``` **完成!** 新引擎自动可用: ```bash # 调用新引擎 curl -X POST /api/v2/aigc/execute \ -d '{"engine": "video_script", "params": {"scenic_spot_id": 1, "duration": 60}}' ``` ### 9.4 与 Java 端的对接 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Java ↔ Python 对接架构 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Java (zwy_picture) Python (TravelContentCreator) │ │ ┌─────────────────────────┐ ┌─────────────────────────────┐ │ │ │ WebSocketTaskController │ │ /api/v2/aigc/ │ │ │ │ │ │ │ │ │ │ POST /tasks/submit │───HTTP────▶│ POST /execute │ │ │ │ { │ │ { │ │ │ │ taskType: "aigc", │ │ engine: "poster_generate",│ │ │ │ params: { │ │ params: {...}, │ │ │ │ engine: "poster..",│ │ callback_url: "..." │ │ │ │ ... │ │ } │ │ │ │ } │ │ │ │ │ │ } │ │ Response: │ │ │ │ │◀──HTTP─────│ {task_id, status, ...} │ │ │ │ │ │ │ │ │ │ GET /tasks/{id}/status │───HTTP────▶│ GET /task/{id}/status │ │ │ │ │◀──HTTP─────│ {progress, result, ...} │ │ │ │ │ │ │ │ │ │ (WebSocket通知用户) │◀─Callback──│ POST callback_url │ │ │ │ │ │ {task_id, status, result} │ │ │ └─────────────────────────┘ └─────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` **Java 端简化后的代码:** ```java // WebSocketTaskController.java - 简化版 @PostMapping("/submit") public BaseResponse> submitTask(@RequestBody Map request) { String taskType = (String) request.get("taskType"); // 所有 AIGC 任务统一走一个入口 if (taskType.equals("aigc") || isAIGCTask(taskType)) { return handleAIGCTask(userId, request); } // 其他非 AIGC 任务... } private BaseResponse> handleAIGCTask(Long userId, Map request) { // 直接转发给 Python 端,不需要 switch-case Map response = aigcServiceClient.execute( request.get("engine"), request.get("params"), getCallbackUrl(userId) ); return ResultUtils.success(response); } ``` ### 9.5 引擎能力查询 API ```yaml # GET /api/v2/aigc/engines # 返回所有可用引擎及其参数结构 Response: engines: - id: "poster_generate" name: "海报生成" version: "2.0.0" param_schema: type: object required: [template_id, images] properties: template_id: {type: string} images: {type: array, items: {type: string}} content: {type: object} estimated_duration: 15 - id: "content_generate" name: "内容生成" version: "1.0.0" param_schema: {...} - id: "video_script" name: "视频脚本生成" version: "1.0.0" param_schema: {...} ``` ### 9.6 更新后的目录结构 ``` domain/ ├── aigc/ # AIGC 统一入口 (新增) │ ├── __init__.py │ ├── engine_registry.py # 引擎注册表 │ ├── engine_executor.py # 引擎执行器 │ ├── task_manager.py # 任务管理 │ └── engines/ # 所有引擎 │ ├── __init__.py │ ├── base.py # 引擎基类 │ ├── poster_generate.py # 海报生成 (封装现有逻辑) │ ├── content_generate.py # 内容生成 (封装现有逻辑) │ ├── topic_generate.py # 选题生成 (封装现有逻辑) │ ├── content_judge.py # 内容审核 (封装现有逻辑) │ └── _future/ # 未来扩展 │ ├── video_script.py │ ├── ai_voice.py │ └── image_style.py │ ├── poster/ # 海报领域 (保持) │ └── ... │ ├── content/ # 内容领域 (保持) │ └── ... │ └── spider/ # 爬虫领域 (保持) └── ... ``` --- ## 十、扩展能力对比 | 场景 | 现有架构 | 新架构 | |-----|---------|-------| | **新增 AIGC 功能** | 改 router + service + 多处 | 只加 1 个引擎文件 | | **Java 端对接** | 每个功能单独对接 | 统一 `/aigc/execute` | | **参数校验** | 各处重复实现 | 引擎自带 Schema | | **进度查询** | 无统一方案 | 引擎统一接口 | | **能力发现** | 硬编码 | 动态查询 API | --- 这个架构设计确保了: 1. **功能不变**:API 签名不变,渐进式迁移 2. **易维护**:每个模块职责单一,改一处不影响其他 3. **易扩展**:新增 AIGC 功能只需添加引擎文件,不改现有代码 4. **统一对接**:Java 端通过统一入口调用所有 AIGC 能力