550 lines
23 KiB
Python
550 lines
23 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
海报服务层 - 重构版本
|
||
封装核心功能,支持基于模板的动态内容生成和海报创建
|
||
"""
|
||
|
||
|
||
import logging
|
||
import uuid
|
||
import time
|
||
import json
|
||
import importlib
|
||
import base64
|
||
from io import BytesIO
|
||
from typing import List, Dict, Any, Optional, Type, Union, cast
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from PIL import Image
|
||
|
||
from core.config import ConfigManager, PosterConfig
|
||
from core.ai import AIAgent
|
||
from utils.file_io import OutputManager
|
||
from utils.image_processor import ImageProcessor
|
||
from poster.templates.base_template import BaseTemplate
|
||
from api.services.database_service import DatabaseService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class PosterService:
|
||
"""海报服务类"""
|
||
|
||
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
|
||
"""初始化海报服务"""
|
||
self.ai_agent = ai_agent
|
||
self.config_manager = config_manager
|
||
self.output_manager = output_manager
|
||
self.db_service = DatabaseService(config_manager)
|
||
self._templates = {}
|
||
self._template_instances = {}
|
||
self._image_usage_tracker = {}
|
||
self._init_templates()
|
||
|
||
def _init_templates(self):
|
||
"""从数据库加载模板配置"""
|
||
try:
|
||
db_templates = self.db_service.get_active_poster_templates()
|
||
if db_templates:
|
||
self._templates = {t['id']: t for t in db_templates}
|
||
logger.info(f"从数据库加载了 {len(self._templates)} 个模板")
|
||
else:
|
||
self._load_default_templates()
|
||
logger.info("数据库无模板,使用默认模板配置")
|
||
except Exception as e:
|
||
logger.error(f"从数据库加载模板失败: {e}", exc_info=True)
|
||
self._load_default_templates()
|
||
|
||
def _load_default_templates(self):
|
||
"""加载默认模板配置"""
|
||
self._templates = {
|
||
'vibrant': {
|
||
'id': 'vibrant',
|
||
'name': '活力风格',
|
||
'handler_path': 'poster.templates.vibrant_template',
|
||
'class_name': 'VibrantTemplate',
|
||
'description': '适合景点、活动等充满活力的场景',
|
||
'is_active': True
|
||
},
|
||
'business': {
|
||
'id': 'business',
|
||
'name': '商务风格',
|
||
'handler_path': 'poster.templates.business_template',
|
||
'class_name': 'BusinessTemplate',
|
||
'description': '适合酒店、房地产等商务场景',
|
||
'is_active': True
|
||
}
|
||
}
|
||
|
||
def _load_template_handler(self, template_id: str) -> Optional[BaseTemplate]:
|
||
"""动态加载模板处理器"""
|
||
if template_id not in self._templates:
|
||
logger.error(f"未找到模板: {template_id}")
|
||
return None
|
||
|
||
# 如果已经实例化过,直接返回缓存的实例
|
||
if template_id in self._template_instances:
|
||
return self._template_instances[template_id]
|
||
|
||
template_info = self._templates[template_id]
|
||
handler_path = template_info.get('handler_path')
|
||
class_name = template_info.get('class_name')
|
||
|
||
if not handler_path or not class_name:
|
||
logger.error(f"模板 {template_id} 缺少 handler_path 或 class_name")
|
||
return None
|
||
|
||
try:
|
||
# 动态导入模块和类
|
||
module = importlib.import_module(handler_path)
|
||
template_class = getattr(module, class_name)
|
||
|
||
# 实例化模板
|
||
template_instance = template_class()
|
||
|
||
# 设置字体目录(如果配置了)
|
||
from core.config import PosterConfig
|
||
# poster_config = self.config_manager.get_config('poster', PosterConfig)
|
||
# if poster_config:
|
||
# font_dir = poster_config.font_dir
|
||
# if font_dir and hasattr(template_instance, 'set_font_dir'):
|
||
# template_instance.set_font_dir(font_dir)
|
||
|
||
# 缓存实例以便重用
|
||
self._template_instances[template_id] = template_instance
|
||
logger.info(f"成功加载模板处理器: {template_id} ({handler_path}.{class_name})")
|
||
|
||
return template_instance
|
||
except (ImportError, AttributeError) as e:
|
||
logger.error(f"加载模板处理器失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
def reload_templates(self):
|
||
"""重新加载模板信息"""
|
||
logger.info("重新加载模板信息...")
|
||
self._init_templates()
|
||
# 清除缓存的模板实例,以便重新加载
|
||
self._template_instances = {}
|
||
|
||
def get_available_templates(self) -> List[Dict[str, Any]]:
|
||
"""获取所有可用的模板信息"""
|
||
result = []
|
||
for tid in self._templates:
|
||
template = self._templates[tid]
|
||
if template.get('is_active', True): # 默认为激活状态
|
||
template_info = {
|
||
"id": template["id"],
|
||
"name": template["name"],
|
||
"description": template["description"],
|
||
"handlerPath": template.get("handler_path", ""),
|
||
"className": template.get("class_name", ""),
|
||
"isActive": template.get("is_active", True)
|
||
}
|
||
result.append(template_info)
|
||
return result
|
||
|
||
def get_template_info(self, template_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取指定模板的简化信息"""
|
||
template = self._templates.get(template_id)
|
||
if not template:
|
||
return None
|
||
return {
|
||
"id": template["id"],
|
||
"name": template["name"],
|
||
"description": template["description"],
|
||
"has_prompts": bool(template.get("system_prompt") and template.get("user_prompt_template")),
|
||
"input_format": template.get("input_format", {}),
|
||
"output_format": template.get("output_format", {}),
|
||
"is_active": template.get("is_active", False)
|
||
}
|
||
|
||
async def generate_poster(self,
|
||
template_id: str,
|
||
poster_content: Optional[Dict[str, Any]],
|
||
content_id: Optional[int],
|
||
product_id: Optional[int],
|
||
scenic_spot_id: Optional[int],
|
||
images_base64: Optional[List[str]] ,
|
||
num_variations: int = 1,
|
||
force_llm_generation: bool = False,
|
||
generate_psd: bool = False,
|
||
psd_output_path: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
统一的海报生成入口
|
||
|
||
Args:
|
||
template_id: 模板ID
|
||
poster_content: 用户提供的海报内容(可选)
|
||
content_id: 内容ID,用于从数据库获取内容(可选)
|
||
product_id: 产品ID,用于从数据库获取产品信息(可选)
|
||
scenic_spot_id: 景点ID,用于从数据库获取景点信息(可选)
|
||
images_base64: 图片base64编码,用于生成海报(可选)
|
||
num_variations: 需要生成的变体数量
|
||
force_llm_generation: 是否强制使用LLM生成内容
|
||
generate_psd: 是否生成PSD分层文件
|
||
psd_output_path: PSD文件输出路径(可选,默认自动生成)
|
||
|
||
Returns:
|
||
生成结果字典,包含PNG图像和可选的PSD文件
|
||
"""
|
||
start_time = time.time()
|
||
|
||
# 1. 动态加载模板处理器
|
||
template_handler = self._load_template_handler(template_id)
|
||
if not template_handler:
|
||
raise ValueError(f"无法为模板ID '{template_id}' 加载处理器。")
|
||
|
||
# 2. 准备内容 (LLM或用户提供)
|
||
final_content = poster_content
|
||
if force_llm_generation or not final_content:
|
||
logger.info(f"为模板 {template_id} 按需生成内容...")
|
||
final_content = await self._generate_content_with_llm(template_id, content_id, product_id, scenic_spot_id)
|
||
|
||
if not final_content:
|
||
raise ValueError("无法获取用于生成海报的内容")
|
||
|
||
# # 3. 准备图片
|
||
# images = []
|
||
# if image_ids:
|
||
# images = self.db_service.get_images_by_ids(image_ids)
|
||
# if not images:
|
||
# raise ValueError("无法获取指定的图片")
|
||
|
||
|
||
# # 3. 图片解码
|
||
images = None
|
||
# 获取模板的默认尺寸,如果获取不到则使用标准尺寸
|
||
template_size = getattr(template_handler, 'size', (900, 1200))
|
||
|
||
if images_base64 and images_base64.strip():
|
||
try:
|
||
# 移除可能存在的MIME类型前缀
|
||
if images_base64.startswith("data:"):
|
||
images_base64 = images_base64.split(",", 1)[1]
|
||
|
||
# 解码base64
|
||
image_bytes = base64.b64decode(images_base64)
|
||
|
||
# 创建PIL Image对象
|
||
images = Image.open(BytesIO(image_bytes))
|
||
logger.info("图片解码成功")
|
||
|
||
except Exception as e:
|
||
logger.error(f"图片解码失败: {e}")
|
||
# 创建一个与目标大小一致的纯黑底图
|
||
images = Image.new('RGB', template_size, color='black')
|
||
logger.info(f"创建默认黑色背景图,尺寸: {template_size}")
|
||
else:
|
||
logger.warning("未提供图片数据,使用默认黑色背景图")
|
||
# 创建一个与目标大小一致的纯黑底图
|
||
images = Image.new('RGB', template_size, color='black')
|
||
logger.info(f"创建默认黑色背景图,尺寸: {template_size}")
|
||
|
||
# 4. 调用模板生成海报
|
||
try:
|
||
posters = template_handler.generate(
|
||
content=final_content,
|
||
images=images,
|
||
num_variations=num_variations
|
||
)
|
||
|
||
if not posters:
|
||
raise ValueError("模板未能生成有效的海报")
|
||
|
||
# 5. 保存海报并返回结果
|
||
variations = []
|
||
psd_files = []
|
||
i=0 ## 用于多个海报时,指定海报的编号,此时只有一个没有用上,但是接口开放着。
|
||
output_path = self._save_poster(posters, template_id, i)
|
||
if output_path:
|
||
# 获取图像尺寸
|
||
image_size = [posters.width, posters.height] if hasattr(posters, 'width') else [1350, 1800]
|
||
|
||
variations.append({
|
||
"id": f"{template_id}_v{i}",
|
||
"image": self._image_to_base64(posters),
|
||
"format": "PNG",
|
||
"size": image_size,
|
||
"file_path": str(output_path)
|
||
})
|
||
|
||
# 6. 如果需要,生成PSD分层文件
|
||
if generate_psd:
|
||
psd_result = self._generate_psd_file(
|
||
template_handler, images, final_content,
|
||
template_id, i, psd_output_path
|
||
)
|
||
if psd_result:
|
||
psd_files.append(psd_result)
|
||
|
||
# 记录模板使用情况
|
||
self._update_template_stats(template_id, bool(variations), time.time() - start_time)
|
||
|
||
return {
|
||
"requestId": f"poster-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}",
|
||
"templateId": template_id,
|
||
"resultImagesBase64": variations,
|
||
"psdFiles": psd_files if psd_files else None,
|
||
"metadata": {
|
||
"generation_time": f"{time.time() - start_time:.2f}s",
|
||
"model_used": self.ai_agent.config.model if force_llm_generation or not poster_content else None,
|
||
"num_variations": len(variations),
|
||
"psd_generated": bool(psd_files)
|
||
}
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"生成海报时发生错误: {e}", exc_info=True)
|
||
self._update_template_stats(template_id, False, time.time() - start_time)
|
||
raise ValueError(f"生成海报失败: {str(e)}")
|
||
|
||
def _save_poster(self, poster: Image.Image, template_id: str, variation_id: int=1) -> Optional[Path]:
|
||
"""保存海报到文件系统"""
|
||
try:
|
||
# 创建唯一的主题ID用于保存
|
||
topic_id = f"poster_{template_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||
|
||
# 获取输出目录
|
||
output_dir = self.output_manager.get_topic_dir(topic_id)
|
||
|
||
# 生成文件名
|
||
file_name = f"{template_id}_v{variation_id}.png"
|
||
file_path = output_dir / file_name
|
||
|
||
# 保存图像
|
||
poster.save(file_path, format="PNG")
|
||
logger.info(f"海报已保存: {file_path}")
|
||
|
||
return file_path
|
||
except Exception as e:
|
||
logger.error(f"保存海报失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
def _image_to_base64(self, image: Image.Image) -> str:
|
||
"""将PIL图像转换为base64字符串"""
|
||
buffer = BytesIO()
|
||
image.save(buffer, format="PNG")
|
||
return base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||
|
||
def _update_template_stats(self, template_id: str, success: bool, duration: float):
|
||
"""更新模板使用统计"""
|
||
try:
|
||
# 调用数据库服务的方法更新统计
|
||
self.db_service.update_template_usage_stats(
|
||
template_id=template_id,
|
||
success=success,
|
||
processing_time=duration
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新模板统计失败: {e}")
|
||
|
||
async def _generate_content_with_llm(self, template_id: str, content_id: Optional[int],
|
||
product_id: Optional[int], scenic_spot_id: Optional[int]) -> Optional[Dict[str, Any]]:
|
||
"""使用LLM生成海报内容"""
|
||
# 获取提示词 - 直接从数据库模板信息中获取
|
||
template_info = self._templates.get(template_id, {})
|
||
system_prompt = template_info.get('system_prompt', "")
|
||
user_prompt_template = template_info.get('user_prompt_template', "")
|
||
|
||
if not system_prompt or not user_prompt_template:
|
||
logger.error(f"模板 {template_id} 缺少提示词配置")
|
||
logger.debug(f"模板信息: {template_info}")
|
||
return None
|
||
|
||
logger.info(f"成功加载模板 {template_id} 的提示词配置")
|
||
|
||
# 获取相关数据
|
||
data = {}
|
||
if content_id:
|
||
data['content'] = self.db_service.get_content_by_id(content_id)
|
||
if product_id:
|
||
data['product'] = self.db_service.get_product_by_id(product_id)
|
||
if scenic_spot_id:
|
||
data['scenic_spot'] = self.db_service.get_scenic_spot_by_id(scenic_spot_id)
|
||
|
||
logger.info(f"获取到的数据: content={data.get('content') is not None}, product={data.get('product') is not None}, scenic_spot={data.get('scenic_spot') is not None}")
|
||
|
||
# 格式化数据为简洁的文本格式,参考其他模块的做法
|
||
try:
|
||
logger.info("开始格式化数据...")
|
||
|
||
# 景区信息格式化
|
||
scenic_info = "无相关景区信息"
|
||
if data.get('scenic_spot'):
|
||
logger.info("正在格式化景区信息...")
|
||
spot = data['scenic_spot']
|
||
scenic_info = f"""景区名称: {spot.get('name', '')}
|
||
地址: {spot.get('address', '')}
|
||
描述: {spot.get('description', '')}
|
||
优势: {spot.get('advantage', '')}
|
||
亮点: {spot.get('highlight', '')}
|
||
交通信息: {spot.get('trafficInfo', '')}"""
|
||
logger.info("景区信息格式化完成")
|
||
|
||
# 产品信息格式化
|
||
product_info = "无相关产品信息"
|
||
if data.get('product'):
|
||
logger.info("正在格式化产品信息...")
|
||
product = data['product']
|
||
product_info = f"""产品名称: {product.get('productName', '')}
|
||
原价: {product.get('originPrice', '')}
|
||
实际价格: {product.get('realPrice', '')}
|
||
套餐信息: {product.get('packageInfo', '')}
|
||
核心优势: {product.get('keyAdvantages', '')}
|
||
亮点: {product.get('highlights', '')}
|
||
详细描述: {product.get('detailedDescription', '')}"""
|
||
logger.info("产品信息格式化完成")
|
||
|
||
# 内容信息格式化
|
||
tweet_info = "无相关内容信息"
|
||
if data.get('content'):
|
||
logger.info("正在格式化内容信息...")
|
||
content = data['content']
|
||
tweet_info = f"""标题: {content.get('title', '')}
|
||
内容: {content.get('content', '')}"""
|
||
logger.info("内容信息格式化完成")
|
||
|
||
logger.info("开始构建用户提示词...")
|
||
# 构建用户提示词
|
||
user_prompt = user_prompt_template.format(
|
||
scenic_info=scenic_info,
|
||
product_info=product_info,
|
||
tweet_info=tweet_info
|
||
)
|
||
|
||
logger.info(f"用户提示词构建成功,长度: {len(user_prompt)}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"格式化提示词时发生错误: {e}", exc_info=True)
|
||
# 提供兜底方案
|
||
user_prompt = f"""{user_prompt_template}
|
||
|
||
当前可用数据:
|
||
- 景区信息: {'有' if data.get('scenic_spot') else '无'}
|
||
- 产品信息: {'有' if data.get('product') else '无'}
|
||
- 内容信息: {'有' if data.get('content') else '无'}
|
||
|
||
请根据可用信息生成海报内容。"""
|
||
|
||
try:
|
||
response, _, _, _ = await self.ai_agent.generate_text(
|
||
system_prompt=system_prompt,
|
||
user_prompt=user_prompt,
|
||
use_stream=True
|
||
)
|
||
|
||
# 提取JSON响应
|
||
json_start = response.find('{')
|
||
json_end = response.rfind('}') + 1
|
||
if json_start != -1 and json_end != -1:
|
||
result = json.loads(response[json_start:json_end])
|
||
logger.info(f"LLM生成内容成功: {list(result.keys())}")
|
||
return result
|
||
else:
|
||
logger.error(f"LLM响应中未找到JSON格式内容: {response[:200]}...")
|
||
return None
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"解析LLM响应JSON失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"生成内容时发生错误: {e}", exc_info=True)
|
||
return None
|
||
|
||
def _generate_psd_file(self, template_handler: BaseTemplate, images: Image.Image,
|
||
content: Dict[str, Any], template_id: str,
|
||
variation_id: int, custom_output_path: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
生成PSD分层文件
|
||
|
||
Args:
|
||
template_handler: 模板处理器实例
|
||
images: 图像数据
|
||
content: 海报内容
|
||
template_id: 模板ID
|
||
variation_id: 变体ID
|
||
custom_output_path: 自定义输出路径
|
||
|
||
Returns:
|
||
PSD文件信息字典,包含文件路径、base64编码等
|
||
"""
|
||
try:
|
||
# 检查模板是否支持PSD生成
|
||
if not hasattr(template_handler, 'generate_layered_psd'):
|
||
logger.warning(f"模板 {template_id} 不支持PSD分层输出")
|
||
return None
|
||
|
||
# 生成PSD文件路径
|
||
if custom_output_path:
|
||
psd_filename = custom_output_path
|
||
if not psd_filename.endswith('.psd'):
|
||
psd_filename += '.psd'
|
||
else:
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
psd_filename = f"{template_id}_layered_v{variation_id}_{timestamp}.psd"
|
||
|
||
# 获取输出目录
|
||
topic_id = f"poster_{template_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||
output_dir = self.output_manager.get_topic_dir(topic_id)
|
||
psd_path = output_dir / psd_filename
|
||
|
||
# 调用模板的PSD生成方法
|
||
logger.info(f"开始生成PSD分层文件: {psd_path}")
|
||
generated_psd_path = template_handler.generate_layered_psd(
|
||
images=images,
|
||
content=content,
|
||
output_path=str(psd_path)
|
||
)
|
||
|
||
if not generated_psd_path or not Path(generated_psd_path).exists():
|
||
logger.error("PSD文件生成失败或文件不存在")
|
||
return None
|
||
|
||
# 获取文件信息
|
||
file_size = Path(generated_psd_path).stat().st_size
|
||
|
||
# 可选:生成PSD的base64编码(注意:PSD文件通常较大)
|
||
psd_base64 = None
|
||
if file_size < 10 * 1024 * 1024: # 如果文件小于10MB,才生成base64
|
||
try:
|
||
with open(generated_psd_path, 'rb') as f:
|
||
psd_base64 = base64.b64encode(f.read()).decode('utf-8')
|
||
except Exception as e:
|
||
logger.warning(f"生成PSD文件base64编码失败: {e}")
|
||
|
||
# 生成预览图(从PSD合成PNG预览)
|
||
preview_base64 = None
|
||
try:
|
||
from psd_tools import PSDImage
|
||
psd = PSDImage.open(generated_psd_path)
|
||
preview_image = psd.composite()
|
||
if preview_image:
|
||
preview_base64 = self._image_to_base64(preview_image)
|
||
logger.info("PSD预览图生成成功")
|
||
except Exception as e:
|
||
logger.warning(f"生成PSD预览图失败: {e}")
|
||
|
||
logger.info(f"PSD文件生成成功: {generated_psd_path} ({file_size/1024:.1f}KB)")
|
||
|
||
return {
|
||
"id": f"{template_id}_v{variation_id}_psd",
|
||
"filename": psd_filename,
|
||
"data": psd_base64,
|
||
"size": file_size,
|
||
"format": "PSD"
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成PSD文件时发生错误: {e}", exc_info=True)
|
||
return None
|
||
|
||
def _get_psd_layer_count(self, psd_path: str) -> Optional[int]:
|
||
"""获取PSD文件的图层数量"""
|
||
try:
|
||
from psd_tools import PSDImage
|
||
psd = PSDImage.open(psd_path)
|
||
return len(list(psd))
|
||
except Exception as e:
|
||
logger.warning(f"获取PSD图层数量失败: {e}")
|
||
return None |