TravelContentCreator/utils/poster/image_processor.py
2025-07-08 18:24:23 +08:00

303 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import logging
from PIL import Image
import numpy as np
from typing import Tuple, Union, Optional
import psutil
import gc # 添加垃圾回收模块
class ImageProcessor:
"""
图像处理工具类,提供智能图像加载和压缩功能
"""
def __init__(self, max_memory_percent=80, target_max_pixels=4000000):
"""
初始化图像处理器
Args:
max_memory_percent: 最大内存使用百分比,超过此值将更激进地压缩图像
target_max_pixels: 目标最大像素数(宽×高),用于控制图像大小
"""
self.max_memory_percent = max_memory_percent
self.target_max_pixels = target_max_pixels
self.logger = logging.getLogger(self.__class__.__name__)
# 记录已加载的图像,用于后续清理
self.loaded_images = []
def get_memory_usage(self) -> Tuple[float, float]:
"""
获取当前内存使用情况
Returns:
(已用内存百分比, 可用内存MB)
"""
memory = psutil.virtual_memory()
return memory.percent, memory.available / (1024 * 1024)
def check_memory_pressure(self) -> bool:
"""
检查内存压力,如果内存使用率超过阈值,执行垃圾回收
Returns:
是否存在内存压力
"""
mem_percent, available_mb = self.get_memory_usage()
# 如果内存使用率超过阈值或可用内存低于100MB认为存在内存压力
if mem_percent > self.max_memory_percent or available_mb < 100:
self.logger.warning(f"内存压力过大: 使用率 {mem_percent:.1f}%, 可用 {available_mb:.1f}MB执行垃圾回收")
self.force_garbage_collection()
return True
return False
def force_garbage_collection(self):
"""
强制执行垃圾回收并清理已加载的图像资源
"""
# 释放已加载的图像
self.release_loaded_images()
# 执行多次垃圾回收
for _ in range(3):
gc.collect()
# 记录垃圾回收后的内存状态
mem_percent, available_mb = self.get_memory_usage()
self.logger.info(f"垃圾回收后内存状态: 使用率 {mem_percent:.1f}%, 可用 {available_mb:.1f}MB")
def release_loaded_images(self):
"""
释放所有已加载的图像资源
"""
for img in self.loaded_images:
try:
if hasattr(img, 'close') and callable(img.close):
img.close()
except Exception as e:
self.logger.error(f"释放图像资源失败: {e}")
# 清空列表
self.loaded_images = []
self.logger.info("已释放所有已加载的图像资源")
def calculate_resize_factor(self, width: int, height: int) -> float:
"""
根据图像尺寸计算压缩因子
Args:
width: 图像宽度
height: 图像高度
Returns:
压缩因子 (0.1-1.0)
"""
# 计算原始像素数
original_pixels = width * height
# 如果小于目标像素数,不需要压缩
if original_pixels <= self.target_max_pixels:
return 1.0
# 计算基础压缩因子
base_factor = (self.target_max_pixels / original_pixels) ** 0.5
# 检查内存使用情况
mem_percent, available_mb = self.get_memory_usage()
# 根据内存使用情况调整压缩因子
if mem_percent > self.max_memory_percent:
# 内存紧张,增加压缩率
memory_factor = 1.0 - ((mem_percent - self.max_memory_percent) / 20)
memory_factor = max(0.5, memory_factor) # 至少保留50%质量
base_factor *= memory_factor
self.logger.warning(f"内存使用率高 ({mem_percent:.1f}%),增加压缩率,压缩因子调整为 {base_factor:.2f}")
# 确保压缩因子在合理范围内
return max(0.1, min(1.0, base_factor))
def smart_load_image(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> Image.Image:
"""
智能加载图像,根据内存情况和图像大小自动调整尺寸
Args:
image_path: 图像路径
target_size: 目标尺寸,如果指定则直接调整到此尺寸
Returns:
加载并调整大小后的PIL图像对象
"""
# 先检查内存压力
self.check_memory_pressure()
try:
# 检查文件是否存在
if not os.path.exists(image_path):
self.logger.error(f"图像文件不存在: {image_path}")
# 返回一个空白图像
return Image.new('RGB', target_size or (800, 600), (240, 240, 240))
# 获取文件大小MB
file_size_mb = os.path.getsize(image_path) / (1024 * 1024)
# 如果文件过大,先检查内存
if file_size_mb > 10: # 大于10MB的文件
mem_percent, available_mb = self.get_memory_usage()
self.logger.info(f"大文件 ({file_size_mb:.1f}MB), 内存使用率: {mem_percent:.1f}%, 可用: {available_mb:.1f}MB")
if mem_percent > 90 or available_mb < file_size_mb * 5:
# 内存紧张,使用更保守的加载方式
self.logger.warning(f"内存不足,使用保守加载方式")
result = self._conservative_load(image_path, target_size)
self.loaded_images.append(result)
return result
# 常规加载
with Image.open(image_path) as img:
# 获取原始尺寸
original_width, original_height = img.size
self.logger.debug(f"原始图像尺寸: {original_width}x{original_height}")
# 如果指定了目标尺寸,直接调整
if target_size:
result = img.resize(target_size, Image.Resampling.LANCZOS)
self.loaded_images.append(result)
return result
# 计算压缩因子
resize_factor = self.calculate_resize_factor(original_width, original_height)
if resize_factor < 1.0:
# 需要压缩
new_width = int(original_width * resize_factor)
new_height = int(original_height * resize_factor)
self.logger.info(f"压缩图像 {image_path}{original_width}x{original_height}{new_width}x{new_height}")
result = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
self.loaded_images.append(result)
return result
else:
# 不需要压缩,返回原图副本
result = img.copy()
self.loaded_images.append(result)
return result
except Exception as e:
self.logger.error(f"加载图像 {image_path} 失败: {e}")
# 返回一个空白图像
return Image.new('RGB', target_size or (800, 600), (240, 240, 240))
def _conservative_load(self, image_path: str, target_size: Optional[Tuple[int, int]] = None) -> Image.Image:
"""
保守加载大图像,先获取尺寸,然后以较低质量加载
Args:
image_path: 图像路径
target_size: 目标尺寸
Returns:
加载的PIL图像对象
"""
try:
# 先只获取图像信息,不加载像素数据
with Image.open(image_path) as img:
original_width, original_height = img.size
format = img.format
# 计算合适的缩小尺寸
if target_size:
new_width, new_height = target_size
else:
# 计算一个非常保守的压缩因子
pixels = original_width * original_height
conservative_factor = min(0.5, (2000000 / pixels) ** 0.5)
new_width = int(original_width * conservative_factor)
new_height = int(original_height * conservative_factor)
# 使用缩略图方式加载
with Image.open(image_path) as img:
img.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
# 创建新图像以确保释放原始文件句柄
result = img.copy()
self.logger.info(f"保守加载图像 {image_path}{original_width}x{original_height}{new_width}x{new_height}")
return result
except Exception as e:
self.logger.error(f"保守加载图像失败: {e}")
return Image.new('RGB', target_size or (800, 600), (240, 240, 240))
def batch_process_directory(self, directory: str, max_images: int = 100) -> list:
"""
批量处理目录中的图像,返回处理后的图像列表
Args:
directory: 图像目录
max_images: 最大处理图像数量
Returns:
处理后的PIL图像对象列表
"""
if not os.path.exists(directory):
self.logger.error(f"目录不存在: {directory}")
return []
# 获取所有图像文件
image_files = []
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
image_files.append(os.path.join(root, file))
# 限制图像数量
if len(image_files) > max_images:
self.logger.warning(f"图像文件过多 ({len(image_files)}), 限制为 {max_images}")
image_files = image_files[:max_images]
# 批量加载图像
images = []
for image_file in image_files:
img = self.smart_load_image(image_file)
if img:
images.append(img)
# 每处理5张图片检查一次内存压力
if len(images) % 5 == 0:
self.check_memory_pressure()
self.logger.info(f"已处理 {len(images)}/{len(image_files)} 个图像")
return images
def save_optimized_image(self, image: Image.Image, output_path: str, quality: int = 85) -> bool:
"""
保存优化后的图像
Args:
image: PIL图像对象
output_path: 输出路径
quality: JPEG质量 (1-100)
Returns:
是否保存成功
"""
try:
# 确保输出目录存在
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
# 检查图像模式如果是RGBA且保存为JPEG先转换为RGB
if image.mode == 'RGBA' and (output_path.lower().endswith('.jpg') or output_path.lower().endswith('.jpeg')):
image = image.convert('RGB')
# 保存图像
image.save(output_path, quality=quality, optimize=True)
self.logger.info(f"图像已保存至: {output_path}")
# 保存后检查内存压力
self.check_memory_pressure()
return True
except Exception as e:
self.logger.error(f"保存图像失败: {e}")
return False