333 lines
11 KiB
Python
333 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
图像处理工具类
|
||
用于处理base64编码、图像转换等功能
|
||
"""
|
||
|
||
import base64
|
||
import io
|
||
import logging
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
from pathlib import Path
|
||
from PIL import Image
|
||
import json
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ImageProcessor:
|
||
"""图像处理工具类"""
|
||
|
||
@staticmethod
|
||
def image_to_base64(image_path: str, format: str = "PNG") -> str:
|
||
"""
|
||
将图像文件转换为base64编码
|
||
|
||
Args:
|
||
image_path: 图像文件路径
|
||
format: 输出格式 (PNG, JPEG等)
|
||
|
||
Returns:
|
||
base64编码的字符串
|
||
"""
|
||
try:
|
||
with open(image_path, "rb") as image_file:
|
||
# 读取图像文件
|
||
image_data = image_file.read()
|
||
|
||
# 转换为base64
|
||
base64_data = base64.b64encode(image_data).decode('utf-8')
|
||
|
||
# 添加MIME类型前缀
|
||
mime_type = f"image/{format.lower()}"
|
||
if format.upper() == "JPEG":
|
||
mime_type = "image/jpeg"
|
||
elif format.upper() == "PNG":
|
||
mime_type = "image/png"
|
||
|
||
return f"data:{mime_type};base64,{base64_data}"
|
||
|
||
except Exception as e:
|
||
logger.error(f"图像转base64失败: {e}")
|
||
return ""
|
||
|
||
@staticmethod
|
||
def pil_image_to_base64(image: Image.Image, format: str = "PNG") -> str:
|
||
"""
|
||
将PIL图像对象转换为base64编码
|
||
|
||
Args:
|
||
image: PIL图像对象
|
||
format: 输出格式 (PNG, JPEG等)
|
||
|
||
Returns:
|
||
base64编码的字符串
|
||
"""
|
||
try:
|
||
# 创建内存缓冲区
|
||
buffer = io.BytesIO()
|
||
|
||
# 保存图像到缓冲区
|
||
if format.upper() == "JPEG":
|
||
image.save(buffer, format="JPEG", quality=85)
|
||
else:
|
||
image.save(buffer, format="PNG")
|
||
|
||
# 获取字节数据
|
||
image_data = buffer.getvalue()
|
||
|
||
# 转换为base64
|
||
base64_data = base64.b64encode(image_data).decode('utf-8')
|
||
|
||
# 添加MIME类型前缀
|
||
mime_type = f"image/{format.lower()}"
|
||
if format.upper() == "JPEG":
|
||
mime_type = "image/jpeg"
|
||
elif format.upper() == "PNG":
|
||
mime_type = "image/png"
|
||
|
||
return f"data:{mime_type};base64,{base64_data}"
|
||
|
||
except Exception as e:
|
||
logger.error(f"PIL图像转base64失败: {e}")
|
||
return ""
|
||
|
||
@staticmethod
|
||
def base64_to_image(base64_string: str) -> Optional[Image.Image]:
|
||
"""
|
||
将base64编码转换为PIL图像对象
|
||
|
||
Args:
|
||
base64_string: base64编码的字符串
|
||
|
||
Returns:
|
||
PIL图像对象,如果转换失败则返回None
|
||
"""
|
||
try:
|
||
# 移除MIME类型前缀
|
||
if "data:image" in base64_string:
|
||
base64_data = base64_string.split(",")[1]
|
||
else:
|
||
base64_data = base64_string
|
||
|
||
# 解码base64
|
||
image_data = base64.b64decode(base64_data)
|
||
|
||
# 创建PIL图像对象
|
||
image = Image.open(io.BytesIO(image_data))
|
||
|
||
return image
|
||
|
||
except Exception as e:
|
||
logger.error(f"base64转图像失败: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_image_metadata(image_path: str) -> Dict[str, Any]:
|
||
"""
|
||
获取图像文件的元数据
|
||
|
||
Args:
|
||
image_path: 图像文件路径
|
||
|
||
Returns:
|
||
元数据字典
|
||
"""
|
||
try:
|
||
with Image.open(image_path) as image:
|
||
metadata = {
|
||
"width": image.width,
|
||
"height": image.height,
|
||
"format": image.format,
|
||
"mode": image.mode,
|
||
"size_bytes": Path(image_path).stat().st_size if Path(image_path).exists() else 0
|
||
}
|
||
|
||
# 获取EXIF数据(如果有)
|
||
if hasattr(image, '_getexif') and image._getexif():
|
||
metadata["exif"] = dict(image._getexif())
|
||
|
||
return metadata
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取图像元数据失败: {e}")
|
||
return {}
|
||
|
||
@staticmethod
|
||
def resize_image(image: Image.Image, max_width: int = 1920, max_height: int = 1080) -> Image.Image:
|
||
"""
|
||
调整图像尺寸
|
||
|
||
Args:
|
||
image: PIL图像对象
|
||
max_width: 最大宽度
|
||
max_height: 最大高度
|
||
|
||
Returns:
|
||
调整后的图像对象
|
||
"""
|
||
try:
|
||
# 计算缩放比例
|
||
width, height = image.size
|
||
scale = min(max_width / width, max_height / height, 1.0)
|
||
|
||
if scale < 1.0:
|
||
new_width = int(width * scale)
|
||
new_height = int(height * scale)
|
||
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||
|
||
return image
|
||
|
||
except Exception as e:
|
||
logger.error(f"调整图像尺寸失败: {e}")
|
||
return image
|
||
|
||
@staticmethod
|
||
def create_collage_from_paths(image_paths: List[str], target_size: Tuple[int, int] = (900, 1200)) -> Optional[Image.Image]:
|
||
"""
|
||
从图像路径列表创建拼图
|
||
|
||
Args:
|
||
image_paths: 图像路径列表
|
||
target_size: 目标尺寸 (宽度, 高度)
|
||
|
||
Returns:
|
||
拼图图像对象,如果创建失败则返回None
|
||
"""
|
||
try:
|
||
if not image_paths:
|
||
logger.warning("没有提供图像路径")
|
||
return None
|
||
|
||
# 加载图像
|
||
images = []
|
||
for path in image_paths[:4]: # 最多使用4张图片
|
||
try:
|
||
with Image.open(path) as img:
|
||
# 转换为RGB模式
|
||
if img.mode != 'RGB':
|
||
img = img.convert('RGB')
|
||
images.append(img)
|
||
except Exception as e:
|
||
logger.warning(f"加载图像失败 {path}: {e}")
|
||
continue
|
||
|
||
if not images:
|
||
logger.error("没有成功加载任何图像")
|
||
return None
|
||
|
||
# 创建拼图画布
|
||
collage = Image.new('RGB', target_size, (255, 255, 255))
|
||
|
||
# 计算每个图像的位置和尺寸
|
||
if len(images) == 1:
|
||
# 单张图片,居中显示
|
||
img = images[0]
|
||
img = ImageProcessor.resize_image(img, target_size[0], target_size[1])
|
||
x = (target_size[0] - img.width) // 2
|
||
y = (target_size[1] - img.height) // 2
|
||
collage.paste(img, (x, y))
|
||
elif len(images) == 2:
|
||
# 两张图片,上下排列
|
||
for i, img in enumerate(images):
|
||
img = ImageProcessor.resize_image(img, target_size[0], target_size[1] // 2)
|
||
x = (target_size[0] - img.width) // 2
|
||
y = i * (target_size[1] // 2)
|
||
collage.paste(img, (x, y))
|
||
elif len(images) >= 3:
|
||
# 多张图片,2x2网格
|
||
cell_width = target_size[0] // 2
|
||
cell_height = target_size[1] // 2
|
||
|
||
for i, img in enumerate(images[:4]):
|
||
img = ImageProcessor.resize_image(img, cell_width, cell_height)
|
||
row = i // 2
|
||
col = i % 2
|
||
x = col * cell_width
|
||
y = row * cell_height
|
||
collage.paste(img, (x, y))
|
||
|
||
return collage
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建拼图失败: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def process_images_for_poster(image_paths: List[str],
|
||
target_size: Tuple[int, int] = (900, 1200),
|
||
create_collage: bool = False) -> Dict[str, Any]:
|
||
"""
|
||
处理图像用于海报生成
|
||
|
||
Args:
|
||
image_paths: 图像路径列表
|
||
target_size: 目标尺寸
|
||
create_collage: 是否创建拼图
|
||
|
||
Returns:
|
||
处理结果字典
|
||
"""
|
||
result = {
|
||
"main_image": None,
|
||
"collage_image": None,
|
||
"all_images": [],
|
||
"metadata": {}
|
||
}
|
||
|
||
try:
|
||
# 处理所有图像
|
||
for i, path in enumerate(image_paths):
|
||
try:
|
||
# 转换为base64
|
||
base64_data = ImageProcessor.image_to_base64(path)
|
||
|
||
# 获取元数据
|
||
metadata = ImageProcessor.get_image_metadata(path)
|
||
|
||
image_info = {
|
||
"index": i,
|
||
"path": path,
|
||
"base64": base64_data,
|
||
"metadata": metadata
|
||
}
|
||
|
||
result["all_images"].append(image_info)
|
||
|
||
# 第一张图片作为主图
|
||
if i == 0:
|
||
result["main_image"] = image_info
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理图像失败 {path}: {e}")
|
||
continue
|
||
|
||
# 如果需要创建拼图
|
||
if create_collage and len(image_paths) > 1:
|
||
collage = ImageProcessor.create_collage_from_paths(image_paths, target_size)
|
||
if collage:
|
||
collage_base64 = ImageProcessor.pil_image_to_base64(collage, "JPEG")
|
||
result["collage_image"] = {
|
||
"base64": collage_base64,
|
||
"metadata": {
|
||
"width": collage.width,
|
||
"height": collage.height,
|
||
"format": "JPEG",
|
||
"type": "collage"
|
||
}
|
||
}
|
||
|
||
# 添加总体元数据
|
||
result["metadata"] = {
|
||
"total_images": len(result["all_images"]),
|
||
"has_collage": result["collage_image"] is not None,
|
||
"target_size": target_size
|
||
}
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理图像失败: {e}")
|
||
return result |