From 219fcbbd57328e8b2dee2528cd72c05f34e0e83d Mon Sep 17 00:00:00 2001 From: jinye_huang Date: Tue, 6 May 2025 16:34:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=A4=A7=E4=BA=86hash=E5=B9=B2?= =?UTF-8?q?=E6=89=B0=E5=BC=BA=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/poster_notes_creator.py | 437 +++++++++++++++++++--------------- 1 file changed, 243 insertions(+), 194 deletions(-) diff --git a/utils/poster_notes_creator.py b/utils/poster_notes_creator.py index 2fab22d..dd08bb5 100644 --- a/utils/poster_notes_creator.py +++ b/utils/poster_notes_creator.py @@ -11,6 +11,15 @@ from PIL import ImageEnhance, ImageFilter from .output_handler import OutputHandler import io +# 尝试导入 scipy,如果失败则标记 +try: + from scipy.fftpack import dct, idct + SCIPY_AVAILABLE = True +except ImportError: + SCIPY_AVAILABLE = False + dct = None + idct = None + logger = logging.getLogger(__name__) class PosterNotesCreator: @@ -348,10 +357,98 @@ class PosterNotesCreator: logger.error(traceback.format_exc()) return None + def add_dct_noise(self, image: Image.Image, intensity: float = 0.1, block_size: int = 8) -> Image.Image: + """ + 在DCT域添加噪声以对抗pHash (需要Scipy) + + Args: + image: 输入图像 (建议传入灰度图或处理亮度通道) + intensity: 噪声强度 (0-1) + block_size: DCT块大小 (通常为8) + + Returns: + 添加噪声后的图像 + """ + if not SCIPY_AVAILABLE: + logger.warning("Scipy 未安装,无法执行DCT噪声注入。请运行 'pip install scipy'") + # 可以选择返回原图,或执行一个简化的备用方案 + # 这里我们返回原图 + return image + + try: + # 确保是灰度图或提取亮度通道 (这里以灰度为例) + if image.mode != 'L': + # 如果是彩色图,可以在 Y 通道 (亮度) 操作 + # 为了简化,我们先转为灰度处理 + gray_image = image.convert('L') + else: + gray_image = image + + img_array = np.array(gray_image, dtype=float) + h, w = img_array.shape + + # 确保尺寸是块大小的倍数 + h_pad = (block_size - h % block_size) % block_size + w_pad = (block_size - w % block_size) % block_size + if h_pad != 0 or w_pad != 0: + img_array = np.pad(img_array, ((0, h_pad), (0, w_pad)), mode='reflect') + padded_h, padded_w = img_array.shape + else: + padded_h, padded_w = h, w + + # 分块处理 + for y in range(0, padded_h, block_size): + for x in range(0, padded_w, block_size): + block = img_array[y:y+block_size, x:x+block_size] + + # 执行2D DCT + dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho') + + # 在非DC系数上添加噪声 (跳过 dct_block[0, 0]) + # 噪声强度与系数幅度相关,避免在小系数上加过大噪声 + noise = np.random.randn(block_size, block_size) * intensity * np.abs(dct_block) + # noise = np.random.uniform(-intensity*50, intensity*50, (block_size, block_size)) + noise[0, 0] = 0 # 不改变DC系数 + + # 将噪声添加到DCT系数 + noisy_dct_block = dct_block + noise + + # 执行2D IDCT + idct_block = idct(idct(noisy_dct_block.T, norm='ortho').T, norm='ortho') + + # 将处理后的块放回图像数组 + img_array[y:y+block_size, x:x+block_size] = idct_block + + # 裁剪回原始尺寸 (如果有填充) + if h_pad != 0 or w_pad != 0: + img_array = img_array[:h, :w] + + # 裁剪像素值并转换类型 + img_array = np.clip(img_array, 0, 255) + modified_gray = Image.fromarray(img_array.astype(np.uint8)) + + # 如果原图是彩色,将修改后的亮度通道合并回去 + if image.mode == 'RGB' and gray_image is not image: + # 注意:简单替换亮度通道可能效果不好,混合通常更好 + # 这里用混合的方式 + blend_factor = 0.3 # 控制混合强度 + r, g, b = image.split() + r = Image.blend(r, modified_gray, blend_factor) + g = Image.blend(g, modified_gray, blend_factor) + b = Image.blend(b, modified_gray, blend_factor) + return Image.merge('RGB', (r, g, b)) + else: + # 如果原图是灰度或处理失败,返回修改后的灰度图 + return modified_gray + + except Exception as e: + logger.error(f"DCT噪声注入出错: {e}") + return image # 出错时返回原图 + def add_phash_noise(self, image: Image.Image, intensity: float = 0.05) -> Image.Image: """ 添加扰动以对抗感知哈希算法(pHash) - 通过在频域添加低频扰动实现 + 现在调用基于 Scipy 的 DCT 噪声注入方法 Args: image: 输入图像 @@ -360,238 +457,126 @@ class PosterNotesCreator: Returns: 添加扰动后的图像 """ - # 灰度化处理 - gray_image = image.convert('L') - width, height = gray_image.size - - # 确保宽高是8的倍数(DCT通常用8x8块) - new_width = (width // 8) * 8 - new_height = (height // 8) * 8 - if new_width != width or new_height != height: - gray_image = gray_image.resize((new_width, new_height)) - - # 转为numpy数组 - img_array = np.array(gray_image) - - # 简化版DCT域扰动 - # 分块处理图像 - for y in range(0, new_height, 8): - for x in range(0, new_width, 8): - block = img_array[y:y+8, x:x+8].astype(float) - - # 简单DCT - 对块应用频域变化 - # 这里使用简单方法模拟DCT效果 - # 真正的DCT需要使用scipy.fftpack - avg = np.mean(block) - # 修改低频区块(除直流分量外) - noise_value = random.uniform(-intensity * 10, intensity * 10) - - # 扰动左上角的低频系数(类似于DCT中的低频区域) - block[1:3, 1:3] += noise_value - - # 应用回原图 - img_array[y:y+8, x:x+8] = np.clip(block, 0, 255) - - # 转回PIL图像 - modified_image = Image.fromarray(img_array.astype(np.uint8)) - - # 调整回原始大小 - if new_width != width or new_height != height: - modified_image = modified_image.resize((width, height), Image.LANCZOS) - - # 将修改后的灰度通道应用到原彩色图像 - if image.mode == 'RGB': - r, g, b = image.split() - # 混合原始图像与修改过的灰度图 - blend_factor = 0.2 # 混合强度 - r = Image.blend(r, modified_image, blend_factor) - g = Image.blend(g, modified_image, blend_factor) - b = Image.blend(b, modified_image, blend_factor) - return Image.merge('RGB', (r, g, b)) - else: - return modified_image - - def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image: - """ - 扰动图像的颜色直方图,对抗基于颜色统计的图像匹配 - - Args: - image: 输入图像 - strength: 扰动强度(0-1) - - Returns: - 处理后的图像 - """ - # 确保为RGB模式 - if image.mode != 'RGB': - image = image.convert('RGB') - - # 转为numpy数组 - img_array = np.array(image) - height, width, channels = img_array.shape - - # 对每个通道分别处理 - for channel in range(channels): - # 计算当前通道的直方图 - hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256)) - - # 找出主要颜色区间 (频率高的区间) - threshold = np.percentile(hist, 70) # 取前30%的颜色块 - significant_bins = np.where(hist > threshold)[0] - - if len(significant_bins) > 0: - for bin_idx in significant_bins: - # 计算当前bin对应的颜色范围 - bin_width = 256 // 64 - color_low = bin_idx * bin_width - color_high = (bin_idx + 1) * bin_width - - # 创建颜色范围掩码 - mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high) - - if np.any(mask): - # 生成随机偏移值 - offset = int(strength * bin_width * (random.random() - 0.5) * 2) - - # 应用偏移,确保在0-255范围内 - img_array[:,:,channel][mask] = np.clip( - img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8) - - # 转回PIL图像 - return Image.fromarray(img_array) - - def strip_metadata(self, image: Image.Image) -> Image.Image: - """ - 移除图像中的所有元数据 - - Args: - image: 输入图像 - - Returns: - 无元数据的图像 - """ - # 创建无元数据的副本 - data = io.BytesIO() - image.save(data, format=image.format if image.format else 'PNG') - return Image.open(data) + return self.add_dct_noise(image, intensity=intensity) def optimize_anti_hash_methods(self, image: Image.Image, strength: str = "medium") -> Image.Image: """ - 综合优化的哈希对抗方法,针对性处理aHash、pHash和dHash - - Args: - image: 输入图像 - strength: 处理强度 - "low", "medium", "high" - - Returns: - 处理后的图像 + 综合优化的哈希对抗方法,强度已增加 """ - # 根据强度设置参数 + # 根据强度设置参数 (显著增加 high 强度) if strength == "low": ahash_intensity = 0.03 - phash_intensity = 0.04 + phash_intensity = 0.05 # 基础DCT噪声强度 dhash_intensity = 0.03 + region_flip_prob = 0.3 + num_ahash_blocks = random.randint(8, 15) + num_dhash_lines = random.randint(6, 10) elif strength == "high": - ahash_intensity = 0.09 - phash_intensity = 0.08 - dhash_intensity = 0.09 + ahash_intensity = 0.18 # 大幅增加 + phash_intensity = 0.15 # 大幅增加 + dhash_intensity = 0.18 # 大幅增加 + region_flip_prob = 0.7 # 更大概率翻转 + num_ahash_blocks = random.randint(20, 35) # 更多块 + num_dhash_lines = random.randint(15, 25) # 更多线 else: # medium - ahash_intensity = 0.06 - phash_intensity = 0.06 - dhash_intensity = 0.06 + ahash_intensity = 0.08 # 增加 + phash_intensity = 0.08 # 增加 + dhash_intensity = 0.08 # 增加 + region_flip_prob = 0.5 + num_ahash_blocks = random.randint(12, 25) + num_dhash_lines = random.randint(10, 18) - # 1. 针对aHash (平均哈希)的处理 - 专注于整体亮度变化 - # aHash对整体亮度敏感,创建局部亮度变化可有效对抗 - img_array = np.array(image) + # 1. 针对aHash (平均哈希)的处理 - 强度已增加 + img_array = np.array(image, dtype=np.int16) h, w = img_array.shape[0], img_array.shape[1] - # 创建10-20个随机块,并调整其亮度 - num_blocks = random.randint(10, 20) - for _ in range(num_blocks): - # 随机选择块的位置和大小 + # num_ahash_blocks = random.randint(10, 20) + for _ in range(num_ahash_blocks): block_w = random.randint(w//20, w//10) block_h = random.randint(h//20, h//10) x = random.randint(0, w - block_w) y = random.randint(0, h - block_h) - # 随机调整块的亮度 - delta = int(random.uniform(-25, 25) * ahash_intensity) - if len(img_array.shape) == 3: # 彩色图像 - img_array[y:y+block_h, x:x+block_w, :] = np.clip( - img_array[y:y+block_h, x:x+block_w, :] + delta, 0, 255) - else: # 灰度图像 - img_array[y:y+block_h, x:x+block_w] = np.clip( - img_array[y:y+block_h, x:x+block_w] + delta, 0, 255) - - # 转回PIL图像 + delta = int(random.uniform(-35, 35) * ahash_intensity) # 增加delta范围 + + block = img_array[y:y+block_h, x:x+block_w] + img_array[y:y+block_h, x:x+block_w] = np.clip(block + delta, 0, 255) + image = Image.fromarray(img_array.astype(np.uint8)) - # 2. 调用现有的pHash对抗方法 + # 2. 调用强化的pHash对抗方法 image = self.add_phash_noise(image, intensity=phash_intensity) - # 3. 针对dHash (差值哈希)的处理 - 添加细微梯度扰动 - # dHash对相邻像素梯度敏感,添加小梯度干扰非常有效 - img_array = np.array(image) + # 3. 针对dHash (差值哈希)的处理 - 强度已增加 + img_array = np.array(image, dtype=np.int16) + h, w = img_array.shape[0], img_array.shape[1] - # 计算图像尺寸并创建掩码 mask = np.zeros_like(img_array, dtype=bool) - grid_size = 8 # dHash通常是8x8 - # 在图像中选择8-12条边缘线进行干扰 - num_lines = random.randint(8, 12) - for _ in range(num_lines): - # 随机决定是水平线还是垂直线 - if random.random() < 0.5: # 水平线 + # num_dhash_lines = random.randint(8, 12) + for _ in range(num_dhash_lines): + if random.random() < 0.5: y = random.randint(0, h - 1) - line_width = random.randint(1, 3) - if len(mask.shape) == 3: # 彩色图像 + line_width = random.randint(1, 4) # 增加线宽可能性 + if len(mask.shape) == 3: mask[max(0, y-line_width//2):min(h, y+line_width//2+1), :, :] = True - else: # 灰度图像 + else: mask[max(0, y-line_width//2):min(h, y+line_width//2+1), :] = True - else: # 垂直线 + else: x = random.randint(0, w - 1) - line_width = random.randint(1, 3) - if len(mask.shape) == 3: # 彩色图像 + line_width = random.randint(1, 4) # 增加线宽可能性 + if len(mask.shape) == 3: mask[:, max(0, x-line_width//2):min(w, x+line_width//2+1), :] = True - else: # 灰度图像 + else: mask[:, max(0, x-line_width//2):min(w, x+line_width//2+1)] = True - # 应用梯度干扰 - if len(img_array.shape) == 3: # 彩色图像 - delta = (np.random.random(img_array.shape) * 2 - 1) * dhash_intensity * 25 - for c in range(img_array.shape[2]): - img_array[:,:,c][mask[:,:,c]] += delta[:,:,c][mask[:,:,c]] - else: # 灰度图像 - delta = (np.random.random(img_array.shape) * 2 - 1) * dhash_intensity * 25 - img_array[mask] += delta[mask] + delta = (np.random.random(img_array.shape) * 2 - 1) * dhash_intensity * 35 # 增加delta范围 + img_array[mask] += delta[mask].astype(np.int16) - img_array = np.clip(img_array, 0, 255).astype(np.uint8) + img_array = np.clip(img_array, 0, 255) - # 4. 颜色直方图扰动 - image = Image.fromarray(img_array) - image = self.perturb_color_histogram(image, strength=dhash_intensity) + # 4. 颜色直方图扰动 (强度也略微增加) + image = Image.fromarray(img_array.astype(np.uint8)) + color_hist_strength = dhash_intensity * 0.6 # 关联强度 + image = self.perturb_color_histogram(image, strength=color_hist_strength) - # 5. 区域翻转 - 特别有效对抗所有哈希算法 - if strength != "low" and random.random() < 0.5: + # 5. 区域翻转 - 强度已增加 + if random.random() < region_flip_prob: img_array = np.array(image) - # 随机选择一个小区域 - region_w = random.randint(w//30, w//20) - region_h = random.randint(h//30, h//20) + h, w = img_array.shape[0], img_array.shape[1] + + # 增加区域大小可能性 + max_region_factor = 15 if strength == 'high' else 20 + region_w = random.randint(w//(max_region_factor+5), w//max_region_factor) + region_h = random.randint(h//(max_region_factor+5), h//max_region_factor) x = random.randint(0, w - region_w) y = random.randint(0, h - region_h) - # 对区域进行水平或垂直翻转 - if random.random() < 0.5: # 水平翻转 - if len(img_array.shape) == 3: # 彩色图像 - img_array[y:y+region_h, x:x+region_w, :] = img_array[y:y+region_h, x:x+region_w, :][:, ::-1, :] - else: # 灰度图像 - img_array[y:y+region_h, x:x+region_w] = img_array[y:y+region_h, x:x+region_w][:, ::-1] - else: # 垂直翻转 - if len(img_array.shape) == 3: # 彩色图像 - img_array[y:y+region_h, x:x+region_w, :] = img_array[y:y+region_h, x:x+region_w, :][::-1, :, :] - else: # 灰度图像 - img_array[y:y+region_h, x:x+region_w] = img_array[y:y+region_h, x:x+region_w][::-1, :] + # 加入90度旋转的可能性 + action = random.choice(['flip_h', 'flip_v', 'rotate_90']) if strength != 'low' else random.choice(['flip_h', 'flip_v']) + region = img_array[y:y+region_h, x:x+region_w] + if action == 'flip_h': + img_array[y:y+region_h, x:x+region_w] = region[:, ::-1] + elif action == 'flip_v': + img_array[y:y+region_h, x:x+region_w] = region[::-1, :] + elif action == 'rotate_90' and len(img_array.shape) == 3: # 旋转只对原尺寸区域有效 + # 注意:旋转可能需要调整区域大小或填充,这里简化处理 + # 仅在区域接近正方形时效果较好 + if abs(region_w - region_h) < 5: + rotated_region = np.rot90(region) + # 需要确保旋转后尺寸匹配,如果尺寸变化则跳过或填充 + if rotated_region.shape[0] == region_h and rotated_region.shape[1] == region_w: + img_array[y:y+region_h, x:x+region_w] = rotated_region + + image = Image.fromarray(img_array) + + # 6. (新增可选) 轻微高斯噪声 - 对所有哈希都有轻微普适性干扰 + if strength != 'low' and random.random() < 0.4: + img_array = np.array(image) + noise_sigma = 1.0 if strength == 'medium' else 2.0 # 噪声标准差 + noise = np.random.normal(0, noise_sigma, img_array.shape) + img_array = np.clip(img_array + noise, 0, 255).astype(np.uint8) image = Image.fromarray(img_array) return image @@ -750,6 +735,70 @@ class PosterNotesCreator: return processed_image + def perturb_color_histogram(self, image: Image.Image, strength: float = 0.03) -> Image.Image: + """ + 扰动图像的颜色直方图,对抗基于颜色统计的图像匹配 + + Args: + image: 输入图像 + strength: 扰动强度(0-1) + + Returns: + 处理后的图像 + """ + # 确保为RGB模式 + if image.mode != 'RGB': + image = image.convert('RGB') + + # 转为numpy数组 + img_array = np.array(image) + height, width, channels = img_array.shape + + # 对每个通道分别处理 + for channel in range(channels): + # 计算当前通道的直方图 + hist, _ = np.histogram(img_array[:,:,channel].flatten(), bins=64, range=(0, 256)) + + # 找出主要颜色区间 (频率高的区间) + threshold = np.percentile(hist, 70) # 取前30%的颜色块 + significant_bins = np.where(hist > threshold)[0] + + if len(significant_bins) > 0: + for bin_idx in significant_bins: + # 计算当前bin对应的颜色范围 + bin_width = 256 // 64 + color_low = bin_idx * bin_width + color_high = (bin_idx + 1) * bin_width + + # 创建颜色范围掩码 + mask = (img_array[:,:,channel] >= color_low) & (img_array[:,:,channel] < color_high) + + if np.any(mask): + # 生成随机偏移值 + offset = int(strength * bin_width * (random.random() - 0.5) * 2) + + # 应用偏移,确保在0-255范围内 + img_array[:,:,channel][mask] = np.clip( + img_array[:,:,channel][mask] + offset, 0, 255).astype(np.uint8) + + # 转回PIL图像 + return Image.fromarray(img_array) + + def strip_metadata(self, image: Image.Image) -> Image.Image: + """ + 移除图像中的所有元数据 + + Args: + image: 输入图像 + + Returns: + 无元数据的图像 + """ + # 创建无元数据的副本 + data = io.BytesIO() + image.save(data, format=image.format if image.format else 'PNG') + return Image.open(data) + def process_poster_for_notes( run_id: str, topic_index: int,