TravelContentCreator/utils/image_processor.py

333 lines
11 KiB
Python
Raw Normal View History

2025-07-18 19:32:55 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图像处理工具类
用于处理base64编码图像转换等功能
"""
import base64
import io
import logging
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
from PIL import Image
import json
logger = logging.getLogger(__name__)
class ImageProcessor:
"""图像处理工具类"""
@staticmethod
def image_to_base64(image_path: str, format: str = "PNG") -> str:
"""
将图像文件转换为base64编码
Args:
image_path: 图像文件路径
format: 输出格式 (PNG, JPEG等)
Returns:
base64编码的字符串
"""
try:
with open(image_path, "rb") as image_file:
# 读取图像文件
image_data = image_file.read()
# 转换为base64
base64_data = base64.b64encode(image_data).decode('utf-8')
# 添加MIME类型前缀
mime_type = f"image/{format.lower()}"
if format.upper() == "JPEG":
mime_type = "image/jpeg"
elif format.upper() == "PNG":
mime_type = "image/png"
return f"data:{mime_type};base64,{base64_data}"
except Exception as e:
logger.error(f"图像转base64失败: {e}")
return ""
@staticmethod
def pil_image_to_base64(image: Image.Image, format: str = "PNG") -> str:
"""
将PIL图像对象转换为base64编码
Args:
image: PIL图像对象
format: 输出格式 (PNG, JPEG等)
Returns:
base64编码的字符串
"""
try:
# 创建内存缓冲区
buffer = io.BytesIO()
# 保存图像到缓冲区
if format.upper() == "JPEG":
image.save(buffer, format="JPEG", quality=85)
else:
image.save(buffer, format="PNG")
# 获取字节数据
image_data = buffer.getvalue()
# 转换为base64
base64_data = base64.b64encode(image_data).decode('utf-8')
# 添加MIME类型前缀
mime_type = f"image/{format.lower()}"
if format.upper() == "JPEG":
mime_type = "image/jpeg"
elif format.upper() == "PNG":
mime_type = "image/png"
return f"data:{mime_type};base64,{base64_data}"
except Exception as e:
logger.error(f"PIL图像转base64失败: {e}")
return ""
@staticmethod
def base64_to_image(base64_string: str) -> Optional[Image.Image]:
"""
将base64编码转换为PIL图像对象
Args:
base64_string: base64编码的字符串
Returns:
PIL图像对象如果转换失败则返回None
"""
try:
# 移除MIME类型前缀
if "data:image" in base64_string:
base64_data = base64_string.split(",")[1]
else:
base64_data = base64_string
# 解码base64
image_data = base64.b64decode(base64_data)
# 创建PIL图像对象
image = Image.open(io.BytesIO(image_data))
return image
except Exception as e:
logger.error(f"base64转图像失败: {e}")
return None
@staticmethod
def get_image_metadata(image_path: str) -> Dict[str, Any]:
"""
获取图像文件的元数据
Args:
image_path: 图像文件路径
Returns:
元数据字典
"""
try:
with Image.open(image_path) as image:
metadata = {
"width": image.width,
"height": image.height,
"format": image.format,
"mode": image.mode,
"size_bytes": Path(image_path).stat().st_size if Path(image_path).exists() else 0
}
# 获取EXIF数据如果有
if hasattr(image, '_getexif') and image._getexif():
metadata["exif"] = dict(image._getexif())
return metadata
except Exception as e:
logger.error(f"获取图像元数据失败: {e}")
return {}
@staticmethod
def resize_image(image: Image.Image, max_width: int = 1920, max_height: int = 1080) -> Image.Image:
"""
调整图像尺寸
Args:
image: PIL图像对象
max_width: 最大宽度
max_height: 最大高度
Returns:
调整后的图像对象
"""
try:
# 计算缩放比例
width, height = image.size
scale = min(max_width / width, max_height / height, 1.0)
if scale < 1.0:
new_width = int(width * scale)
new_height = int(height * scale)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
return image
except Exception as e:
logger.error(f"调整图像尺寸失败: {e}")
return image
@staticmethod
2025-08-04 16:40:46 +08:00
def create_collage_from_paths(image_paths: List[str], target_size: Tuple[int, int] = (1350, 1800)) -> Optional[Image.Image]:
2025-07-18 19:32:55 +08:00
"""
从图像路径列表创建拼图
Args:
image_paths: 图像路径列表
target_size: 目标尺寸 (宽度, 高度)
Returns:
拼图图像对象如果创建失败则返回None
"""
try:
if not image_paths:
logger.warning("没有提供图像路径")
return None
# 加载图像
images = []
for path in image_paths[:4]: # 最多使用4张图片
try:
with Image.open(path) as img:
# 转换为RGB模式
if img.mode != 'RGB':
img = img.convert('RGB')
images.append(img)
except Exception as e:
logger.warning(f"加载图像失败 {path}: {e}")
continue
if not images:
logger.error("没有成功加载任何图像")
return None
# 创建拼图画布
collage = Image.new('RGB', target_size, (255, 255, 255))
# 计算每个图像的位置和尺寸
if len(images) == 1:
# 单张图片,居中显示
img = images[0]
img = ImageProcessor.resize_image(img, target_size[0], target_size[1])
x = (target_size[0] - img.width) // 2
y = (target_size[1] - img.height) // 2
collage.paste(img, (x, y))
elif len(images) == 2:
# 两张图片,上下排列
for i, img in enumerate(images):
img = ImageProcessor.resize_image(img, target_size[0], target_size[1] // 2)
x = (target_size[0] - img.width) // 2
y = i * (target_size[1] // 2)
collage.paste(img, (x, y))
elif len(images) >= 3:
# 多张图片2x2网格
cell_width = target_size[0] // 2
cell_height = target_size[1] // 2
for i, img in enumerate(images[:4]):
img = ImageProcessor.resize_image(img, cell_width, cell_height)
row = i // 2
col = i % 2
x = col * cell_width
y = row * cell_height
collage.paste(img, (x, y))
return collage
except Exception as e:
logger.error(f"创建拼图失败: {e}")
return None
@staticmethod
def process_images_for_poster(image_paths: List[str],
2025-08-04 16:40:46 +08:00
target_size: Tuple[int, int] = (1350, 1800),
2025-07-18 19:32:55 +08:00
create_collage: bool = False) -> Dict[str, Any]:
"""
处理图像用于海报生成
Args:
image_paths: 图像路径列表
target_size: 目标尺寸
create_collage: 是否创建拼图
Returns:
处理结果字典
"""
result = {
"main_image": None,
"collage_image": None,
"all_images": [],
"metadata": {}
}
try:
# 处理所有图像
for i, path in enumerate(image_paths):
try:
# 转换为base64
base64_data = ImageProcessor.image_to_base64(path)
# 获取元数据
metadata = ImageProcessor.get_image_metadata(path)
image_info = {
"index": i,
"path": path,
"base64": base64_data,
"metadata": metadata
}
result["all_images"].append(image_info)
# 第一张图片作为主图
if i == 0:
result["main_image"] = image_info
except Exception as e:
logger.error(f"处理图像失败 {path}: {e}")
continue
# 如果需要创建拼图
if create_collage and len(image_paths) > 1:
collage = ImageProcessor.create_collage_from_paths(image_paths, target_size)
if collage:
collage_base64 = ImageProcessor.pil_image_to_base64(collage, "JPEG")
result["collage_image"] = {
"base64": collage_base64,
"metadata": {
"width": collage.width,
"height": collage.height,
"format": "JPEG",
"type": "collage"
}
}
# 添加总体元数据
result["metadata"] = {
"total_images": len(result["all_images"]),
"has_collage": result["collage_image"] is not None,
"target_size": target_size
}
return result
except Exception as e:
logger.error(f"处理图像失败: {e}")
return result