2025-08-04 11:23:50 +08:00

1142 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
海报服务层 - 重构版本
封装核心功能,支持基于模板的动态内容生成和海报创建
"""
import logging
import uuid
import time
import json
import importlib
import base64
import binascii
from io import BytesIO
from typing import List, Dict, Any, Optional, Type, Union, cast
from datetime import datetime
from pathlib import Path
from PIL import Image
from core.config import ConfigManager, PosterConfig
from core.ai import AIAgent
from utils.file_io import OutputManager
from utils.image_processor import ImageProcessor
from poster.templates.base_template import BaseTemplate
from api.services.database_service import DatabaseService
logger = logging.getLogger(__name__)
class PosterService:
"""海报服务类"""
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
"""初始化海报服务"""
self.ai_agent = ai_agent
self.config_manager = config_manager
self.output_manager = output_manager
self.db_service = DatabaseService(config_manager)
self._templates = {}
self._template_instances = {}
self._image_usage_tracker = {}
self._init_templates()
def _init_templates(self):
"""从数据库加载模板配置"""
try:
db_templates = self.db_service.get_active_poster_templates()
if db_templates:
self._templates = {t['id']: t for t in db_templates}
logger.info(f"从数据库加载了 {len(self._templates)} 个模板")
else:
self._load_default_templates()
logger.info("数据库无模板,使用默认模板配置")
except Exception as e:
logger.error(f"从数据库加载模板失败: {e}", exc_info=True)
self._load_default_templates()
def _load_default_templates(self):
"""加载默认模板配置"""
self._templates = {
'vibrant': {
'id': 'vibrant',
'name': '活力风格',
'handler_path': 'poster.templates.vibrant_template',
'class_name': 'VibrantTemplate',
'description': '适合景点、活动等充满活力的场景',
'is_active': True
},
'business': {
'id': 'business',
'name': '商务风格',
'handler_path': 'poster.templates.business_template',
'class_name': 'BusinessTemplate',
'description': '适合酒店、房地产等商务场景',
'is_active': True
}
}
def _load_template_handler(self, template_id: str) -> Optional[BaseTemplate]:
"""动态加载模板处理器"""
if template_id not in self._templates:
logger.error(f"未找到模板: {template_id}")
return None
# 如果已经实例化过,直接返回缓存的实例
if template_id in self._template_instances:
return self._template_instances[template_id]
template_info = self._templates[template_id]
handler_path = template_info.get('handler_path')
class_name = template_info.get('class_name')
if not handler_path or not class_name:
logger.error(f"模板 {template_id} 缺少 handler_path 或 class_name")
return None
try:
# 动态导入模块和类
module = importlib.import_module(handler_path)
template_class = getattr(module, class_name)
# 实例化模板
template_instance = template_class()
# 设置字体目录(如果配置了)
from core.config import PosterConfig
# poster_config = self.config_manager.get_config('poster', PosterConfig)
# if poster_config:
# font_dir = poster_config.font_dir
# if font_dir and hasattr(template_instance, 'set_font_dir'):
# template_instance.set_font_dir(font_dir)
# 缓存实例以便重用
self._template_instances[template_id] = template_instance
logger.info(f"成功加载模板处理器: {template_id} ({handler_path}.{class_name})")
return template_instance
except (ImportError, AttributeError) as e:
logger.error(f"加载模板处理器失败: {e}", exc_info=True)
return None
def reload_templates(self):
"""重新加载模板信息"""
logger.info("重新加载模板信息...")
self._init_templates()
# 清除缓存的模板实例,以便重新加载
self._template_instances = {}
def get_available_templates(self) -> List[Dict[str, Any]]:
"""获取所有可用的模板信息"""
result = []
for tid in self._templates:
template = self._templates[tid]
if template.get('is_active', True): # 默认为激活状态
template_info = {
"id": template["id"],
"name": template["name"],
"description": template["description"],
"handlerPath": template.get("handler_path", ""),
"className": template.get("class_name", ""),
"isActive": template.get("is_active", True)
}
result.append(template_info)
return result
def get_template_info(self, template_id: str) -> Optional[Dict[str, Any]]:
"""获取指定模板的简化信息"""
template = self._templates.get(template_id)
if not template:
return None
return {
"id": template["id"],
"name": template["name"],
"description": template["description"],
"has_prompts": bool(template.get("system_prompt") and template.get("user_prompt_template")),
"input_format": template.get("input_format", {}),
"output_format": template.get("output_format", {}),
"is_active": template.get("is_active", False)
}
async def generate_poster(self,
template_id: str,
poster_content: Optional[Dict[str, Any]],
content_id: Optional[str],
product_id: Optional[str],
scenic_spot_id: Optional[str],
images_base64: Optional[List[str]] ,
num_variations: int = 1,
force_llm_generation: bool = False,
generate_psd: bool = False,
psd_output_path: Optional[str] = None,
generate_fabric_json: bool = False) -> Dict[str, Any]:
"""
统一的海报生成入口
Args:
template_id: 模板ID
poster_content: 用户提供的海报内容(可选)
content_id: 内容ID用于从数据库获取内容可选
product_id: 产品ID用于从数据库获取产品信息可选
scenic_spot_id: 景点ID用于从数据库获取景点信息可选
images_base64: 图片base64编码用于生成海报可选
num_variations: 需要生成的变体数量
force_llm_generation: 是否强制使用LLM生成内容
generate_psd: 是否生成PSD分层文件
psd_output_path: PSD文件输出路径可选默认自动生成
generate_fabric_json: 是否生成Fabric.js JSON格式
Returns:
生成结果字典包含PNG图像和可选的PSD文件、Fabric.js JSON
"""
start_time = time.time()
# 添加参数调试信息
logger.info("=" * 100)
logger.info("海报生成服务 - 接收到的参数:")
logger.info(f" template_id: {template_id} (类型: {type(template_id)})")
logger.info(f" content_id: {content_id} (类型: {type(content_id)})")
logger.info(f" product_id: {product_id} (类型: {type(product_id)})")
logger.info(f" scenic_spot_id: {scenic_spot_id} (类型: {type(scenic_spot_id)})")
logger.info(f" poster_content: {poster_content is not None}")
logger.info(f" poster_content详细内容: {poster_content}")
logger.info(f" force_llm_generation: {force_llm_generation}")
logger.info("=" * 100)
# 1. 动态加载模板处理器
template_handler = self._load_template_handler(template_id)
if not template_handler:
raise ValueError(f"无法为模板ID '{template_id}' 加载处理器。")
# 2. 准备内容 (LLM或用户提供)
final_content = poster_content
if force_llm_generation or not final_content:
logger.info(f"为模板 {template_id} 按需生成内容...")
final_content = await self._generate_content_with_llm(template_id, content_id, product_id, scenic_spot_id, poster_content)
if not final_content:
raise ValueError("无法获取用于生成海报的内容")
# # 3. 准备图片
# images = []
# if image_ids:
# images = self.db_service.get_images_by_ids(image_ids)
# if not images:
# raise ValueError("无法获取指定的图片")
# # 3. 图片解码
images = None
# 获取模板的默认尺寸,如果获取不到则使用标准尺寸
template_size = getattr(template_handler, 'size', (900, 1200))
if images_base64 and images_base64.strip():
try:
# 移除可能存在的MIME类型前缀
if images_base64.startswith("data:"):
images_base64 = images_base64.split(",", 1)[1]
# 彻底清理base64字符串 - 移除所有空白字符
images_base64 = ''.join(images_base64.split())
# 验证base64字符串长度应该是4的倍数
if len(images_base64) % 4 != 0:
# 添加必要的填充
images_base64 += '=' * (4 - len(images_base64) % 4)
logger.info(f"为base64字符串添加了填充最终长度: {len(images_base64)}")
logger.info(f"准备解码base64数据长度: {len(images_base64)}, 前20字符: {images_base64[:20]}...")
# 解码base64
image_bytes = base64.b64decode(images_base64)
logger.info(f"base64解码成功图片数据大小: {len(image_bytes)} bytes")
# 验证解码后的数据不为空
if len(image_bytes) == 0:
raise ValueError("解码后的图片数据为空")
# 检查文件头判断图片格式
file_header = image_bytes[:10]
if file_header.startswith(b'\xff\xd8\xff'):
logger.info("检测到JPEG格式图片")
elif file_header.startswith(b'\x89PNG'):
logger.info("检测到PNG格式图片")
else:
logger.warning(f"未识别的图片格式,文件头: {file_header.hex()}")
# 创建PIL Image对象
image_io = BytesIO(image_bytes)
images = Image.open(image_io)
# 验证图片是否成功打开
images.verify() # 验证图片完整性
# 重新打开图片verify后需要重新打开
image_io.seek(0)
images = Image.open(image_io)
logger.info(f"图片解码成功,格式: {images.format}, 尺寸: {images.size}, 模式: {images.mode}")
except binascii.Error as e:
logger.error(f"Base64解码失败: {e}")
logger.error(f"问题数据长度: {len(images_base64) if 'images_base64' in locals() else 'unknown'}")
# 创建一个与目标大小一致的纯黑底图
images = Image.new('RGB', template_size, color='black')
logger.info(f"创建默认黑色背景图,尺寸: {template_size}")
except Exception as e:
logger.error(f"图片处理失败: {e}")
logger.error(f"错误类型: {type(e).__name__}")
if 'image_bytes' in locals():
logger.error(f"图片数据大小: {len(image_bytes)} bytes, 前20字节: {image_bytes[:20].hex()}")
# 创建一个与目标大小一致的纯黑底图
images = Image.new('RGB', template_size, color='black')
logger.info(f"创建默认黑色背景图,尺寸: {template_size}")
else:
logger.warning("未提供图片数据,使用默认黑色背景图")
# 创建一个与目标大小一致的纯黑底图
images = Image.new('RGB', template_size, color='black')
logger.info(f"创建默认黑色背景图,尺寸: {template_size}")
# 4. 调用模板生成海报
try:
posters = template_handler.generate(
content=final_content,
images=images,
num_variations=num_variations
)
if not posters:
raise ValueError("模板未能生成有效的海报")
# 5. 保存海报并返回结果
variations = []
psd_files = []
fabric_jsons = []
i=0 ## 用于多个海报时,指定海报的编号,此时只有一个没有用上,但是接口开放着。
output_path = self._save_poster(posters, template_id, i)
if output_path:
# 获取图像尺寸
image_size = [posters.width, posters.height] if hasattr(posters, 'width') else [1350, 1800]
variations.append({
"id": f"{template_id}_v{i}",
"image": self._image_to_base64(posters),
"format": "PNG",
"size": image_size,
"file_path": str(output_path)
})
# 6. 如果需要生成PSD分层文件
if generate_psd:
psd_result = self._generate_psd_file(
template_handler, images, final_content,
template_id, i, psd_output_path
)
if psd_result:
psd_files.append(psd_result)
# 7. 如果需要生成Fabric.js JSON
if generate_fabric_json:
fabric_json = self._generate_fabric_json(final_content, template_id, image_size, images)
fabric_jsons.append(fabric_json)
# 记录模板使用情况
self._update_template_stats(template_id, bool(variations), time.time() - start_time)
return {
"requestId": f"poster-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}",
"templateId": template_id,
"resultImagesBase64": variations,
"psdFiles": psd_files if psd_files else None,
"fabricJsons": fabric_jsons if fabric_jsons else None,
"metadata": {
"generation_time": f"{time.time() - start_time:.2f}s",
"model_used": self.ai_agent.config.model if force_llm_generation or not poster_content else None,
"num_variations": len(variations),
"psd_generated": bool(psd_files),
"fabric_json_generated": bool(fabric_jsons)
}
}
except Exception as e:
logger.error(f"生成海报时发生错误: {e}", exc_info=True)
self._update_template_stats(template_id, False, time.time() - start_time)
raise ValueError(f"生成海报失败: {str(e)}")
def _save_poster(self, poster: Image.Image, template_id: str, variation_id: int=1) -> Optional[Path]:
"""保存海报到文件系统"""
try:
# 创建唯一的主题ID用于保存
topic_id = f"poster_{template_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# 获取输出目录
output_dir = self.output_manager.get_topic_dir(topic_id)
# 生成文件名
file_name = f"{template_id}_v{variation_id}.png"
file_path = output_dir / file_name
# 保存图像
poster.save(file_path, format="PNG")
logger.info(f"海报已保存: {file_path}")
return file_path
except Exception as e:
logger.error(f"保存海报失败: {e}", exc_info=True)
return None
def _image_to_base64(self, image: Image.Image) -> str:
"""将PIL图像转换为base64字符串"""
buffer = BytesIO()
image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode('utf-8')
def _update_template_stats(self, template_id: str, success: bool, duration: float):
"""更新模板使用统计"""
try:
# 调用数据库服务的方法更新统计
self.db_service.update_template_usage_stats(
template_id=template_id,
success=success,
processing_time=duration
)
except Exception as e:
logger.warning(f"更新模板统计失败: {e}")
async def _generate_content_with_llm(self, template_id: str, content_id: Optional[str],
product_id: Optional[str], scenic_spot_id: Optional[str],
poster_content: Optional[Any] = None) -> Optional[Dict[str, Any]]:
"""使用LLM生成海报内容"""
# 获取提示词 - 直接从数据库模板信息中获取
template_info = self._templates.get(template_id, {})
system_prompt = template_info.get('system_prompt', "")
user_prompt_template = template_info.get('user_prompt_template', "")
if not system_prompt or not user_prompt_template:
logger.error(f"模板 {template_id} 缺少提示词配置")
logger.debug(f"模板信息: {template_info}")
return None
logger.info(f"成功加载模板 {template_id} 的提示词配置")
# 获取相关数据 - 将字符串ID转换为整数
data = {}
def safe_int_convert(id_str: Optional[str]) -> Optional[int]:
"""安全将字符串ID转换为整数"""
if not id_str:
return None
try:
# 如果ID包含非数字字符只提取数字部分或返回None
if id_str.isdigit():
return int(id_str)
else:
# 对于类似 "generated_note_1753693091224_0" 的ID提取数字部分
import re
numbers = re.findall(r'\d+', id_str)
if numbers:
return int(numbers[0]) # 使用第一个数字序列
return None
except (ValueError, TypeError):
logger.warning(f"无法转换ID为整数: {id_str}")
return None
if content_id:
content_id_int = safe_int_convert(content_id)
if content_id_int:
data['content'] = self.db_service.get_content_by_id(content_id_int)
if product_id:
product_id_int = safe_int_convert(product_id)
if product_id_int:
data['product'] = self.db_service.get_product_by_id(product_id_int)
if scenic_spot_id:
scenic_spot_id_int = safe_int_convert(scenic_spot_id)
if scenic_spot_id_int:
data['scenic_spot'] = self.db_service.get_scenic_spot_by_id(scenic_spot_id_int)
logger.info(f"获取到的数据: content={data.get('content') is not None}, product={data.get('product') is not None}, scenic_spot={data.get('scenic_spot') is not None}")
# 格式化数据为简洁的文本格式,参考其他模块的做法
try:
logger.info("开始格式化数据...")
# 景区信息格式化
scenic_info = "无相关景区信息"
if data.get('scenic_spot'):
logger.info("正在格式化景区信息...")
spot = data['scenic_spot']
scenic_info = f"""景区名称: {spot.get('name', '')}
地址: {spot.get('address', '')}
描述: {spot.get('description', '')}
优势: {spot.get('advantage', '')}
亮点: {spot.get('highlight', '')}
交通信息: {spot.get('trafficInfo', '')}"""
logger.info("景区信息格式化完成")
# 产品信息格式化
product_info = "无相关产品信息"
if data.get('product'):
logger.info("正在格式化产品信息...")
product = data['product']
product_info = f"""产品名称: {product.get('productName', '')}
原价: {product.get('originPrice', '')}
实际价格: {product.get('realPrice', '')}
套餐信息: {product.get('packageInfo', '')}
核心优势: {product.get('keyAdvantages', '')}
亮点: {product.get('highlights', '')}
详细描述: {product.get('detailedDescription', '')}"""
logger.info("产品信息格式化完成")
# 内容信息格式化
tweet_info = "无相关内容信息"
if data.get('content'):
logger.info("正在格式化内容信息...")
content = data['content']
tweet_info = f"""标题: {content.get('title', '')}
内容: {content.get('content', '')}"""
logger.info("内容信息格式化完成")
logger.info("开始构建用户提示词...")
# 构建用户提示词
user_prompt = user_prompt_template.format(
scenic_info=scenic_info,
product_info=product_info,
tweet_info=tweet_info
)
logger.info(f"用户提示词构建成功,长度: {len(user_prompt)}")
except Exception as e:
logger.error(f"格式化提示词时发生错误: {e}", exc_info=True)
# 提供兜底方案
user_prompt = f"""{user_prompt_template}
当前可用数据:
- 景区信息: {'' if data.get('scenic_spot') else ''}
- 产品信息: {'' if data.get('product') else ''}
- 内容信息: {'' if data.get('content') else ''}
请根据可用信息生成海报内容。"""
try:
response, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True
)
# 提取JSON响应
json_start = response.find('{')
json_end = response.rfind('}') + 1
if json_start != -1 and json_end != -1:
result = json.loads(response[json_start:json_end])
logger.info(f"LLM生成内容成功: {list(result.keys())}")
return result
else:
logger.error(f"LLM响应中未找到JSON格式内容: {response[:200]}...")
return None
except json.JSONDecodeError as e:
logger.error(f"解析LLM响应JSON失败: {e}")
return None
except Exception as e:
logger.error(f"生成内容时发生错误: {e}", exc_info=True)
return None
def _generate_psd_file(self, template_handler: BaseTemplate, images: Image.Image,
content: Dict[str, Any], template_id: str,
variation_id: int, custom_output_path: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
生成Fabric.js JSON文件保持接口兼容性实际生成JSON而非PSD
Args:
template_handler: 模板处理器实例
images: 图像数据
content: 海报内容
template_id: 模板ID
variation_id: 变体ID
custom_output_path: 自定义输出路径
Returns:
JSON文件信息字典包含文件路径、base64编码等
"""
try:
# 获取图像尺寸
image_size = [images.width, images.height] if hasattr(images, 'width') else [900, 1200]
# 生成Fabric.js JSON数据
fabric_json = self._generate_fabric_json(content, template_id, image_size, images)
# 生成JSON文件路径
if custom_output_path:
json_filename = custom_output_path
if not json_filename.endswith('.json'):
json_filename += '.json'
else:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
json_filename = f"{template_id}_fabric_v{variation_id}_{timestamp}.json"
# 获取输出目录
topic_id = f"poster_{template_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_dir = self.output_manager.get_topic_dir(topic_id)
json_path = output_dir / json_filename
# 保存JSON文件
logger.info(f"开始生成Fabric.js JSON文件: {json_path}")
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(fabric_json, f, ensure_ascii=False, indent=2)
logger.info(f"Fabric.js JSON文件保存成功: {json_path}")
except Exception as e:
logger.error(f"保存JSON文件失败: {e}")
return None
# 获取文件信息
file_size = Path(json_path).stat().st_size
# 生成JSON的base64编码
json_base64 = None
try:
json_string = json.dumps(fabric_json, ensure_ascii=False)
json_base64 = base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
except Exception as e:
logger.warning(f"生成JSON base64编码失败: {e}")
logger.info(f"Fabric.js JSON文件生成成功: {json_path} ({file_size/1024:.1f}KB)")
return {
"id": f"{template_id}_v{variation_id}_fabric",
"filename": json_filename,
"data": json_base64,
"size": file_size,
"format": "JSON",
"json_data": fabric_json # 添加原始JSON数据
}
except Exception as e:
logger.error(f"生成Fabric.js JSON文件时发生错误: {e}", exc_info=True)
return None
def _get_json_object_count(self, json_path: str) -> Optional[int]:
"""获取Fabric.js JSON文件的对象数量"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
fabric_data = json.load(f)
return len(fabric_data.get('objects', []))
except Exception as e:
logger.warning(f"获取JSON对象数量失败: {e}")
return None
def _generate_fabric_json(self, content: Dict[str, Any], template_id: str, image_size: List[int], images: Image.Image = None) -> Dict[str, Any]:
"""
生成支持多级分层的Fabric.js JSON格式
Args:
content: 海报内容数据
template_id: 模板ID
image_size: 图像尺寸 [width, height]
images: 用户上传的图片
Returns:
Dict: 支持多级分层的Fabric.js JSON格式数据
"""
try:
fabric_objects = []
# 基础画布尺寸
canvas_width, canvas_height = image_size[0], image_size[1]
# 1. 图片层组 (最底层 - 用户上传的图片)
image_group = self._create_image_layer(images, canvas_width, canvas_height)
fabric_objects.append(image_group)
# 2. 背景层组 (第二层)
background_group = self._create_background_layer(canvas_width, canvas_height, template_id)
fabric_objects.append(background_group)
# 3. 内容层组 (中间层)
content_group = self._create_content_layer(content, canvas_width, canvas_height)
fabric_objects.append(content_group)
# 4. 装饰层组 (顶层)
decoration_group = self._create_decoration_layer(content, canvas_width, canvas_height)
fabric_objects.append(decoration_group)
# 构建完整的Fabric.js JSON
fabric_json = {
"version": "5.3.0",
"objects": fabric_objects,
"background": "transparent",
"backgroundImage": None,
"overlayImage": None,
"clipPath": None,
"width": canvas_width,
"height": canvas_height,
"viewportTransform": [1, 0, 0, 1, 0, 0],
"backgroundVpt": True,
"overlayVpt": True,
"selection": True,
"preserveObjectStacking": True,
"snapAngle": 0,
"snapThreshold": 10,
"centeredScaling": False,
"centeredRotation": True,
"interactive": True,
"skipTargetFind": False,
"enableRetinaScaling": True,
"imageSmoothingEnabled": True,
"perPixelTargetFind": False,
"targetFindTolerance": 0,
"skipOffscreen": True,
"includeDefaultValues": True
}
logger.info(f"成功生成多级分层Fabric.js JSON包含 {len(fabric_objects)} 个层组")
return fabric_json
except Exception as e:
logger.error(f"生成Fabric.js JSON失败: {e}")
return {
"version": "5.3.0",
"objects": [],
"background": "transparent",
"width": image_size[0],
"height": image_size[1]
}
def _create_image_layer(self, images: Image.Image, canvas_width: int, canvas_height: int) -> Dict[str, Any]:
"""创建图片层组(最底层)"""
image_objects = []
if images and hasattr(images, 'width'):
# 将PIL图像转换为base64以便在Fabric.js中使用
image_base64 = self._image_to_base64(images)
# 计算图片的缩放比例,保持宽高比
image_width, image_height = images.width, images.height
scale_x = canvas_width / image_width
scale_y = canvas_height / image_height
scale = min(scale_x, scale_y) # 保持宽高比的适应缩放
# 计算居中位置
scaled_width = image_width * scale
scaled_height = image_height * scale
left = (canvas_width - scaled_width) / 2
top = (canvas_height - scaled_height) / 2
image_objects.append({
"type": "image",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": left,
"top": top,
"width": image_width,
"height": image_height,
"scaleX": scale,
"scaleY": scale,
"angle": 0,
"flipX": False,
"flipY": False,
"opacity": 1,
"visible": True,
"src": f"data:image/png;base64,{image_base64}",
"crossOrigin": "anonymous",
"name": "user_uploaded_image",
"data": {
"type": "user_image",
"replaceable": True,
"original_size": [image_width, image_height],
"scale_ratio": scale
},
"selectable": True,
"evented": True,
"moveCursor": "move",
"cornerStyle": "circle",
"cornerSize": 12,
"transparentCorners": False,
"cornerColor": "#4dabf7",
"cornerStrokeColor": "#ffffff",
"borderColor": "#4dabf7",
"borderScaleFactor": 2
})
else:
# 如果没有图片,创建一个占位符
image_objects.append({
"type": "rect",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"fill": "#f8f9fa",
"stroke": "#dee2e6",
"strokeWidth": 2,
"strokeDashArray": [10, 5],
"name": "image_placeholder",
"data": {
"type": "placeholder",
"replaceable": True,
"placeholder_text": "点击上传图片"
},
"selectable": True,
"evented": True
})
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"angle": 0,
"scaleX": 1,
"scaleY": 1,
"flipX": False,
"flipY": False,
"opacity": 1,
"visible": True,
"name": "image_layer",
"data": {"layer": "image", "level": 0, "replaceable": True},
"objects": image_objects,
"selectable": False,
"evented": True
}
def _create_background_layer(self, canvas_width: int, canvas_height: int, template_id: str) -> Dict[str, Any]:
"""创建背景层组"""
background_objects = []
# 主背景
background_objects.append({
"type": "rect",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"fill": "transparent",
"stroke": None,
"name": "main_background",
"selectable": False,
"evented": False
})
# 渐变背景(可选)
if template_id:
background_objects.append({
"type": "rect",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"fill": {
"type": "linear",
"coords": {
"x1": 0,
"y1": 0,
"x2": 0,
"y2": canvas_height
},
"colorStops": [
{"offset": 0, "color": "#ffffff", "opacity": 0.8},
{"offset": 1, "color": "#f8f9fa", "opacity": 0.9}
]
},
"name": "gradient_background",
"selectable": False,
"evented": False
})
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"angle": 0,
"scaleX": 1,
"scaleY": 1,
"flipX": False,
"flipY": False,
"opacity": 1,
"visible": True,
"name": "background_layer",
"data": {"layer": "background", "level": 1},
"objects": background_objects,
"selectable": False,
"evented": False
}
def _create_content_layer(self, content: Dict[str, Any], canvas_width: int, canvas_height: int) -> Dict[str, Any]:
"""创建内容层组,包含多个子分层"""
content_objects = []
# 标题组
title_group = self._create_title_group(content, canvas_width)
if title_group["objects"]:
content_objects.append(title_group)
# 正文组
body_group = self._create_body_group(content, canvas_width)
if body_group["objects"]:
content_objects.append(body_group)
# 价格信息组
price_group = self._create_price_group(content, canvas_width)
if price_group["objects"]:
content_objects.append(price_group)
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"angle": 0,
"scaleX": 1,
"scaleY": 1,
"flipX": False,
"flipY": False,
"opacity": 1,
"visible": True,
"name": "content_layer",
"data": {"layer": "content", "level": 2},
"objects": content_objects
}
def _create_title_group(self, content: Dict[str, Any], canvas_width: int) -> Dict[str, Any]:
"""创建标题分组"""
title_objects = []
# 主标题
if content.get('title'):
title_objects.append({
"type": "textbox",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 50,
"top": 80,
"width": canvas_width - 100,
"height": 80,
"fill": "#2c3e50",
"fontFamily": "Arial, sans-serif",
"fontWeight": "bold",
"fontSize": 48,
"text": str(content['title']),
"textAlign": "center",
"name": "main_title",
"data": {"type": "title", "priority": "high"}
})
# 副标题
if content.get('slogan'):
title_objects.append({
"type": "textbox",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 50,
"top": 170,
"width": canvas_width - 100,
"height": 40,
"fill": "#7f8c8d",
"fontFamily": "Arial, sans-serif",
"fontWeight": "normal",
"fontSize": 24,
"text": str(content['slogan']),
"textAlign": "center",
"name": "subtitle",
"data": {"type": "subtitle", "priority": "medium"}
})
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": 250,
"name": "title_group",
"data": {"section": "title", "level": 3},
"objects": title_objects
}
def _create_body_group(self, content: Dict[str, Any], canvas_width: int) -> Dict[str, Any]:
"""创建正文分组"""
body_objects = []
if content.get('content'):
text_content = content['content']
if isinstance(text_content, list):
text_content = '\n'.join(text_content)
body_objects.append({
"type": "textbox",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 50,
"top": 280,
"width": canvas_width - 100,
"height": 120,
"fill": "#34495e",
"fontFamily": "Arial, sans-serif",
"fontWeight": "normal",
"fontSize": 18,
"text": str(text_content),
"textAlign": "left",
"lineHeight": 1.4,
"name": "main_content",
"data": {"type": "content", "priority": "medium"}
})
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 250,
"width": canvas_width,
"height": 150,
"name": "body_group",
"data": {"section": "body", "level": 3},
"objects": body_objects
}
def _create_price_group(self, content: Dict[str, Any], canvas_width: int) -> Dict[str, Any]:
"""创建价格信息分组"""
price_objects = []
if content.get('price'):
# 价格背景
price_objects.append({
"type": "rect",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 40,
"top": 420,
"width": canvas_width - 80,
"height": 80,
"fill": "#e74c3c",
"rx": 10,
"ry": 10,
"name": "price_background"
})
# 价格文字
price_objects.append({
"type": "textbox",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 50,
"top": 440,
"width": canvas_width - 100,
"height": 40,
"fill": "#ffffff",
"fontFamily": "Arial, sans-serif",
"fontWeight": "bold",
"fontSize": 36,
"text": str(content['price']),
"textAlign": "center",
"name": "price_text",
"data": {"type": "price", "priority": "high"}
})
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 400,
"width": canvas_width,
"height": 100,
"name": "price_group",
"data": {"section": "price", "level": 3},
"objects": price_objects
}
def _create_decoration_layer(self, content: Dict[str, Any], canvas_width: int, canvas_height: int) -> Dict[str, Any]:
"""创建装饰层组"""
decoration_objects = []
# 装饰边框
decoration_objects.append({
"type": "rect",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 20,
"top": 20,
"width": canvas_width - 40,
"height": canvas_height - 40,
"fill": "transparent",
"stroke": "#3498db",
"strokeWidth": 3,
"strokeDashArray": [10, 5],
"name": "decoration_border",
"selectable": False
})
# 角落装饰
decoration_objects.append({
"type": "circle",
"version": "5.3.0",
"originX": "center",
"originY": "center",
"left": canvas_width - 50,
"top": 50,
"radius": 20,
"fill": "#f39c12",
"name": "corner_decoration",
"selectable": False
})
return {
"type": "group",
"version": "5.3.0",
"originX": "left",
"originY": "top",
"left": 0,
"top": 0,
"width": canvas_width,
"height": canvas_height,
"angle": 0,
"scaleX": 1,
"scaleY": 1,
"flipX": False,
"flipY": False,
"opacity": 1,
"visible": True,
"name": "decoration_layer",
"data": {"layer": "decoration", "level": 3},
"objects": decoration_objects,
"selectable": False
}