#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 图像处理工具类 用于处理base64编码、图像转换等功能 """ import base64 import io import logging from typing import Dict, Any, List, Optional, Tuple from pathlib import Path from PIL import Image import json logger = logging.getLogger(__name__) class ImageProcessor: """图像处理工具类""" @staticmethod def image_to_base64(image_path: str, format: str = "PNG") -> str: """ 将图像文件转换为base64编码 Args: image_path: 图像文件路径 format: 输出格式 (PNG, JPEG等) Returns: base64编码的字符串 """ try: with open(image_path, "rb") as image_file: # 读取图像文件 image_data = image_file.read() # 转换为base64 base64_data = base64.b64encode(image_data).decode('utf-8') # 添加MIME类型前缀 mime_type = f"image/{format.lower()}" if format.upper() == "JPEG": mime_type = "image/jpeg" elif format.upper() == "PNG": mime_type = "image/png" return f"data:{mime_type};base64,{base64_data}" except Exception as e: logger.error(f"图像转base64失败: {e}") return "" @staticmethod def pil_image_to_base64(image: Image.Image, format: str = "PNG") -> str: """ 将PIL图像对象转换为base64编码 Args: image: PIL图像对象 format: 输出格式 (PNG, JPEG等) Returns: base64编码的字符串 """ try: # 创建内存缓冲区 buffer = io.BytesIO() # 保存图像到缓冲区 if format.upper() == "JPEG": image.save(buffer, format="JPEG", quality=85) else: image.save(buffer, format="PNG") # 获取字节数据 image_data = buffer.getvalue() # 转换为base64 base64_data = base64.b64encode(image_data).decode('utf-8') # 添加MIME类型前缀 mime_type = f"image/{format.lower()}" if format.upper() == "JPEG": mime_type = "image/jpeg" elif format.upper() == "PNG": mime_type = "image/png" return f"data:{mime_type};base64,{base64_data}" except Exception as e: logger.error(f"PIL图像转base64失败: {e}") return "" @staticmethod def base64_to_image(base64_string: str) -> Optional[Image.Image]: """ 将base64编码转换为PIL图像对象 Args: base64_string: base64编码的字符串 Returns: PIL图像对象,如果转换失败则返回None """ try: # 移除MIME类型前缀 if "data:image" in base64_string: base64_data = base64_string.split(",")[1] else: base64_data = base64_string # 解码base64 image_data = base64.b64decode(base64_data) # 创建PIL图像对象 image = Image.open(io.BytesIO(image_data)) return image except Exception as e: logger.error(f"base64转图像失败: {e}") return None @staticmethod def get_image_metadata(image_path: str) -> Dict[str, Any]: """ 获取图像文件的元数据 Args: image_path: 图像文件路径 Returns: 元数据字典 """ try: with Image.open(image_path) as image: metadata = { "width": image.width, "height": image.height, "format": image.format, "mode": image.mode, "size_bytes": Path(image_path).stat().st_size if Path(image_path).exists() else 0 } # 获取EXIF数据(如果有) if hasattr(image, '_getexif') and image._getexif(): metadata["exif"] = dict(image._getexif()) return metadata except Exception as e: logger.error(f"获取图像元数据失败: {e}") return {} @staticmethod def resize_image(image: Image.Image, max_width: int = 1920, max_height: int = 1080) -> Image.Image: """ 调整图像尺寸 Args: image: PIL图像对象 max_width: 最大宽度 max_height: 最大高度 Returns: 调整后的图像对象 """ try: # 计算缩放比例 width, height = image.size scale = min(max_width / width, max_height / height, 1.0) if scale < 1.0: new_width = int(width * scale) new_height = int(height * scale) image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) return image except Exception as e: logger.error(f"调整图像尺寸失败: {e}") return image @staticmethod def create_collage_from_paths(image_paths: List[str], target_size: Tuple[int, int] = (900, 1200)) -> Optional[Image.Image]: """ 从图像路径列表创建拼图 Args: image_paths: 图像路径列表 target_size: 目标尺寸 (宽度, 高度) Returns: 拼图图像对象,如果创建失败则返回None """ try: if not image_paths: logger.warning("没有提供图像路径") return None # 加载图像 images = [] for path in image_paths[:4]: # 最多使用4张图片 try: with Image.open(path) as img: # 转换为RGB模式 if img.mode != 'RGB': img = img.convert('RGB') images.append(img) except Exception as e: logger.warning(f"加载图像失败 {path}: {e}") continue if not images: logger.error("没有成功加载任何图像") return None # 创建拼图画布 collage = Image.new('RGB', target_size, (255, 255, 255)) # 计算每个图像的位置和尺寸 if len(images) == 1: # 单张图片,居中显示 img = images[0] img = ImageProcessor.resize_image(img, target_size[0], target_size[1]) x = (target_size[0] - img.width) // 2 y = (target_size[1] - img.height) // 2 collage.paste(img, (x, y)) elif len(images) == 2: # 两张图片,上下排列 for i, img in enumerate(images): img = ImageProcessor.resize_image(img, target_size[0], target_size[1] // 2) x = (target_size[0] - img.width) // 2 y = i * (target_size[1] // 2) collage.paste(img, (x, y)) elif len(images) >= 3: # 多张图片,2x2网格 cell_width = target_size[0] // 2 cell_height = target_size[1] // 2 for i, img in enumerate(images[:4]): img = ImageProcessor.resize_image(img, cell_width, cell_height) row = i // 2 col = i % 2 x = col * cell_width y = row * cell_height collage.paste(img, (x, y)) return collage except Exception as e: logger.error(f"创建拼图失败: {e}") return None @staticmethod def process_images_for_poster(image_paths: List[str], target_size: Tuple[int, int] = (900, 1200), create_collage: bool = False) -> Dict[str, Any]: """ 处理图像用于海报生成 Args: image_paths: 图像路径列表 target_size: 目标尺寸 create_collage: 是否创建拼图 Returns: 处理结果字典 """ result = { "main_image": None, "collage_image": None, "all_images": [], "metadata": {} } try: # 处理所有图像 for i, path in enumerate(image_paths): try: # 转换为base64 base64_data = ImageProcessor.image_to_base64(path) # 获取元数据 metadata = ImageProcessor.get_image_metadata(path) image_info = { "index": i, "path": path, "base64": base64_data, "metadata": metadata } result["all_images"].append(image_info) # 第一张图片作为主图 if i == 0: result["main_image"] = image_info except Exception as e: logger.error(f"处理图像失败 {path}: {e}") continue # 如果需要创建拼图 if create_collage and len(image_paths) > 1: collage = ImageProcessor.create_collage_from_paths(image_paths, target_size) if collage: collage_base64 = ImageProcessor.pil_image_to_base64(collage, "JPEG") result["collage_image"] = { "base64": collage_base64, "metadata": { "width": collage.width, "height": collage.height, "format": "JPEG", "type": "collage" } } # 添加总体元数据 result["metadata"] = { "total_images": len(result["all_images"]), "has_collage": result["collage_image"] is not None, "target_size": target_size } return result except Exception as e: logger.error(f"处理图像失败: {e}") return result