#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 海报服务层 - 简化版本 封装核心功能,重点优化图片使用追踪 """ import logging import uuid import time from typing import List, Dict, Any, Optional from datetime import datetime from core.config import ConfigManager, PosterConfig from core.ai import AIAgent from utils.file_io import OutputManager from utils.image_processor import ImageProcessor from poster.poster_generator import PosterGenerator from api.services.database_service import DatabaseService logger = logging.getLogger(__name__) class PosterService: """海报服务类""" def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager): """ 初始化海报服务 Args: ai_agent: AI代理 config_manager: 配置管理器 output_manager: 输出管理器 """ self.ai_agent = ai_agent self.config_manager = config_manager self.output_manager = output_manager # 初始化各个组件 self.poster_generator = PosterGenerator(config_manager, output_manager) # 初始化数据库服务 self.db_service = DatabaseService(config_manager) # 图片使用追踪存储(实际应用中应该使用数据库) self._image_usage_tracker = {} def generate_poster_simplified(self, content_id: Optional[int] = None, product_id: Optional[int] = None, scenic_spot_id: Optional[int] = None, image_ids: Optional[List[int]] = None, generate_collage: bool = False) -> Dict[str, Any]: """ 简化的海报生成方法 Args: content_id: 内容ID product_id: 产品ID scenic_spot_id: 景区ID image_ids: 图像ID列表 generate_collage: 是否生成拼图 Returns: 包含base64图像数据和图片使用信息的字典 """ start_time = time.time() logger.info(f"开始生成海报: content_id={content_id}, product_id={product_id}, scenic_spot_id={scenic_spot_id}") result = { "request_id": f"poster-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}", "poster_base64": "", "content_info": None, "product_info": None, "scenic_spot_info": None, "used_image_ids": [], "image_usage_info": [], "collage_base64": None, "metadata": {} } try: # 1. 获取内容信息 if content_id: content_info = self.db_service.get_content_by_id(content_id) if content_info: result["content_info"] = self._build_content_info(content_info) # 2. 获取产品信息 if product_id: product_info = self.db_service.get_product_by_id(product_id) if product_info: result["product_info"] = self._build_product_info(product_info) # 3. 获取景区信息 if scenic_spot_id: scenic_spot_info = self.db_service.get_scenic_spot_by_id(scenic_spot_id) if scenic_spot_info: result["scenic_spot_info"] = self._build_scenic_spot_info(scenic_spot_info) # 4. 处理图像信息并追踪使用情况 image_paths = [] if image_ids: images = self.db_service.get_images_by_ids(image_ids) for img in images: image_paths.append(img['filePath']) result["used_image_ids"].append(img['id']) # 更新图片使用追踪 self._update_image_usage(img['id'], "poster_generation") # 5. 构建海报内容 poster_content = self._build_poster_content_from_info( result["content_info"], result["product_info"], result["scenic_spot_info"] ) # 6. 生成海报(只使用vibrant模板) poster_path = self.poster_generator.generate_poster( poster_content, str(content_id or product_id or scenic_spot_id or "unknown"), "vibrant" ) if poster_path: # 转换为base64 result["poster_base64"] = ImageProcessor.image_to_base64(poster_path) # 7. 处理拼图 if generate_collage and len(image_paths) > 1: collage_result = ImageProcessor.process_images_for_poster( image_paths, target_size=(900, 1200), create_collage=True ) if collage_result.get("collage_image"): result["collage_base64"] = collage_result["collage_image"]["base64"] # 更新拼图中使用的图片使用追踪 for img_id in result["used_image_ids"]: self._update_image_usage(img_id, "collage_creation") # 8. 构建图片使用信息 result["image_usage_info"] = self._get_image_usage_info(result["used_image_ids"]) # 9. 添加元数据 processing_time = time.time() - start_time result["metadata"] = { "total_images_used": len(result["used_image_ids"]), "has_collage": result["collage_base64"] is not None, "processing_time": f"{processing_time:.2f}s", "template_used": "vibrant" } logger.info(f"海报生成完成,处理时间: {processing_time:.2f}s") return result except Exception as e: logger.error(f"生成海报失败: {e}", exc_info=True) result["metadata"]["error"] = str(e) return result def get_image_usage_info(self, image_ids: List[int]) -> Dict[str, Any]: """ 获取图像使用情况信息 Args: image_ids: 图像ID列表 Returns: 图像使用情况信息 """ request_id = f"usage-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}" image_usage_info = [] total_usage_count = 0 usage_counts = [] for img_id in image_ids: usage_info = self._get_single_image_usage_info(img_id) if usage_info: image_usage_info.append(usage_info) total_usage_count += usage_info["usage_count"] usage_counts.append(usage_info["usage_count"]) # 计算汇总信息 summary = { "total_images": len(image_ids), "total_usage_count": total_usage_count, "most_used_image_id": None, "least_used_image_id": None } if usage_counts: max_usage = max(usage_counts) min_usage = min(usage_counts) summary["most_used_image_id"] = next( (info["image_id"] for info in image_usage_info if info["usage_count"] == max_usage), None ) summary["least_used_image_id"] = next( (info["image_id"] for info in image_usage_info if info["usage_count"] == min_usage), None ) return { "request_id": request_id, "image_usage_info": image_usage_info, "summary": summary } def _update_image_usage(self, image_id: int, context: str): """更新图片使用追踪""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if image_id not in self._image_usage_tracker: self._image_usage_tracker[image_id] = { "usage_count": 0, "first_used_at": current_time, "last_used_at": current_time, "usage_context": [] } tracker = self._image_usage_tracker[image_id] tracker["usage_count"] += 1 tracker["last_used_at"] = current_time if context not in tracker["usage_context"]: tracker["usage_context"].append(context) def _get_single_image_usage_info(self, image_id: int) -> Optional[Dict[str, Any]]: """获取单个图片的使用信息""" if image_id in self._image_usage_tracker: tracker = self._image_usage_tracker[image_id] return { "image_id": image_id, "usage_count": tracker["usage_count"], "first_used_at": tracker["first_used_at"], "last_used_at": tracker["last_used_at"], "usage_context": tracker["usage_context"] } else: # 如果图片从未被使用过,返回默认信息 return { "image_id": image_id, "usage_count": 0, "first_used_at": "", "last_used_at": "", "usage_context": [] } def _get_image_usage_info(self, image_ids: List[int]) -> List[Dict[str, Any]]: """获取图片使用信息列表""" usage_info = [] for img_id in image_ids: info = self._get_single_image_usage_info(img_id) if info: usage_info.append(info) return usage_info def _build_content_info(self, content_data: Dict[str, Any]) -> Dict[str, Any]: """构建内容信息""" return { "id": content_data["id"], "title": content_data["title"], "content": content_data["content"], "tag": content_data["tag"] } def _build_product_info(self, product_data: Dict[str, Any]) -> Dict[str, Any]: """构建产品信息""" return { "id": product_data["id"], "name": product_data["name"], "description": product_data.get("description"), "real_price": float(product_data["realPrice"]) if product_data.get("realPrice") else None, "origin_price": float(product_data["originPrice"]) if product_data.get("originPrice") else None } def _build_scenic_spot_info(self, scenic_data: Dict[str, Any]) -> Dict[str, Any]: """构建景区信息""" return { "id": scenic_data["id"], "name": scenic_data["name"], "description": scenic_data.get("description"), "address": scenic_data.get("address") } def _build_poster_content_from_info(self, content_info: Optional[Dict[str, Any]], product_info: Optional[Dict[str, Any]], scenic_spot_info: Optional[Dict[str, Any]]) -> Dict[str, Any]: """从信息构建海报内容""" title = "" content_parts = [] tags = [] # 构建标题 if content_info: title = content_info["title"] if content_info.get("content"): content_parts.append(content_info["content"]) if content_info.get("tag"): tags.extend(content_info["tag"].split(",")) else: # 如果没有内容信息,使用景区和产品信息构建标题 if scenic_spot_info and product_info: title = f"{scenic_spot_info['name']} - {product_info['name']}" elif scenic_spot_info: title = scenic_spot_info['name'] elif product_info: title = product_info['name'] # 添加景区信息 if scenic_spot_info and scenic_spot_info.get("description"): content_parts.append(f"景区介绍: {scenic_spot_info['description']}") tags.append(scenic_spot_info["name"]) # 添加产品信息 if product_info: if product_info.get("description"): content_parts.append(f"产品介绍: {product_info['description']}") if product_info.get("real_price"): content_parts.append(f"价格: ¥{product_info['real_price']}") tags.append(product_info["name"]) content = "\n\n".join(content_parts) if content_parts else "暂无详细内容" return { "title": title, "content": content, "tag": tags }