352 lines
14 KiB
Python
352 lines
14 KiB
Python
import os
|
||
import json
|
||
import openpyxl
|
||
from openpyxl import Workbook
|
||
import logging
|
||
from collections import defaultdict
|
||
import difflib
|
||
import re
|
||
import markdown
|
||
import html
|
||
from bs4 import BeautifulSoup
|
||
|
||
# 配置日志记录
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger("CompareResults")
|
||
|
||
def create_comparison_files(base_dir):
|
||
"""
|
||
创建两个Excel文件用于比较原始内容和检测后的内容
|
||
|
||
参数:
|
||
base_dir: 基础目录路径,包含多个子文件夹
|
||
"""
|
||
# 创建两个工作簿
|
||
origin_wb = Workbook()
|
||
detect_wb = Workbook()
|
||
|
||
# 获取活动工作表
|
||
origin_ws = origin_wb.active
|
||
detect_ws = detect_wb.active
|
||
|
||
# 设置表头
|
||
origin_ws.append(["A", "B"])
|
||
detect_ws.append(["A", "B"])
|
||
|
||
# 设置B列标题
|
||
origin_ws.cell(row=1, column=2).value = "原文"
|
||
detect_ws.cell(row=1, column=2).value = "检测后"
|
||
|
||
# 用于存储数据以便后续合并
|
||
all_data = defaultdict(dict)
|
||
|
||
# 找到所有子文件夹,这里假设每个子文件夹都有格式如"YYYY-MM-DD_HH-MM-SS"
|
||
date_folders = []
|
||
for item in os.listdir(base_dir):
|
||
item_path = os.path.join(base_dir, item)
|
||
if os.path.isdir(item_path) and "_" in item: # 简单判断是否为日期文件夹
|
||
date_folders.append(item_path)
|
||
|
||
logger.info(f"找到 {len(date_folders)} 个日期文件夹")
|
||
|
||
row_idx = 2 # 从第二行开始填充数据
|
||
|
||
# 处理每个日期文件夹
|
||
for date_folder in date_folders:
|
||
logger.info(f"处理文件夹: {date_folder}")
|
||
|
||
# 查找所有内容子文件夹 (如 1_1, 1_2, 2_1 等)
|
||
content_folders = []
|
||
for item in os.listdir(date_folder):
|
||
item_path = os.path.join(date_folder, item)
|
||
if os.path.isdir(item_path):
|
||
content_folders.append(item_path)
|
||
|
||
# 处理每个内容文件夹
|
||
for content_folder in content_folders:
|
||
folder_name = os.path.basename(content_folder)
|
||
logger.info(f"处理内容文件夹: {folder_name}")
|
||
|
||
# 查找原始文件
|
||
article_path = os.path.join(content_folder, "article.json")
|
||
article_detect_path = os.path.join(content_folder, "article_detect.json")
|
||
|
||
# 处理原始文件
|
||
if os.path.exists(article_path):
|
||
try:
|
||
with open(article_path, 'r', encoding='utf-8') as f:
|
||
article_data = json.load(f)
|
||
# 将文件夹名称填入A列
|
||
origin_ws.cell(row=row_idx, column=1).value = folder_name
|
||
# 将JSON内容转换为字符串并填入B列
|
||
origin_content = json.dumps(article_data, ensure_ascii=False)
|
||
origin_ws.cell(row=row_idx, column=2).value = origin_content
|
||
# 存储数据用于合并
|
||
all_data[folder_name]['origin'] = article_data
|
||
logger.info(f"成功添加原始内容: {folder_name}")
|
||
except Exception as e:
|
||
logger.error(f"处理文件 {article_path} 时出错: {e}")
|
||
else:
|
||
logger.warning(f"文件不存在: {article_path}")
|
||
|
||
# 处理检测后文件
|
||
if os.path.exists(article_detect_path):
|
||
try:
|
||
with open(article_detect_path, 'r', encoding='utf-8') as f:
|
||
detect_data = json.load(f)
|
||
# 将文件夹名称填入A列
|
||
detect_ws.cell(row=row_idx, column=1).value = folder_name
|
||
# 将JSON内容转换为字符串并填入B列
|
||
detect_content = json.dumps(detect_data, ensure_ascii=False)
|
||
detect_ws.cell(row=row_idx, column=2).value = detect_content
|
||
# 存储数据用于合并
|
||
all_data[folder_name]['detect'] = detect_data
|
||
logger.info(f"成功添加检测后内容: {folder_name}")
|
||
except Exception as e:
|
||
logger.error(f"处理文件 {article_detect_path} 时出错: {e}")
|
||
else:
|
||
logger.warning(f"文件不存在: {article_detect_path}")
|
||
|
||
# 移动到下一行
|
||
row_idx += 1
|
||
|
||
# 调整列宽
|
||
origin_ws.column_dimensions['A'].width = 15
|
||
origin_ws.column_dimensions['B'].width = 100
|
||
detect_ws.column_dimensions['A'].width = 15
|
||
detect_ws.column_dimensions['B'].width = 100
|
||
|
||
# 保存文件
|
||
output_dir = os.path.dirname(os.path.abspath(__file__))
|
||
origin_file = os.path.join(output_dir, "compare_origin.xlsx")
|
||
detect_file = os.path.join(output_dir, "compare_detect.xlsx")
|
||
|
||
origin_wb.save(origin_file)
|
||
detect_wb.save(detect_file)
|
||
|
||
logger.info(f"原始内容已保存到: {origin_file}")
|
||
logger.info(f"检测后内容已保存到: {detect_file}")
|
||
|
||
# 创建合并工作簿
|
||
merged_file = create_merged_file(all_data, output_dir)
|
||
|
||
# 生成差异报告
|
||
generate_visual_diff_report(all_data, output_dir)
|
||
|
||
return origin_file, detect_file, merged_file
|
||
|
||
def create_merged_file(all_data, output_dir):
|
||
"""
|
||
创建合并的Excel文件,包含原始内容和检测后内容
|
||
|
||
参数:
|
||
all_data: 包含所有数据的字典
|
||
output_dir: 输出目录
|
||
"""
|
||
# 创建新工作簿
|
||
merged_wb = Workbook()
|
||
merged_ws = merged_wb.active
|
||
|
||
# 设置表头
|
||
merged_ws.append(["A", "B", "C"])
|
||
merged_ws.cell(row=1, column=1).value = "索引"
|
||
merged_ws.cell(row=1, column=2).value = "原文"
|
||
merged_ws.cell(row=1, column=3).value = "检测后"
|
||
|
||
# 填充数据
|
||
row_idx = 2
|
||
for folder_name, data in sorted(all_data.items()):
|
||
# 填充A列(索引)
|
||
merged_ws.cell(row=row_idx, column=1).value = folder_name
|
||
|
||
# 填充B列(原文)
|
||
if 'origin' in data:
|
||
merged_ws.cell(row=row_idx, column=2).value = json.dumps(data['origin'], ensure_ascii=False)
|
||
|
||
# 填充C列(检测后)
|
||
if 'detect' in data:
|
||
merged_ws.cell(row=row_idx, column=3).value = json.dumps(data['detect'], ensure_ascii=False)
|
||
|
||
row_idx += 1
|
||
|
||
# 调整列宽
|
||
merged_ws.column_dimensions['A'].width = 15
|
||
merged_ws.column_dimensions['B'].width = 100
|
||
merged_ws.column_dimensions['C'].width = 100
|
||
|
||
# 保存文件
|
||
merged_file = os.path.join(output_dir, "merge_result.xlsx")
|
||
merged_wb.save(merged_file)
|
||
|
||
logger.info(f"合并内容已保存到: {merged_file}")
|
||
return merged_file
|
||
|
||
def generate_visual_diff_html(text1, text2):
|
||
"""
|
||
生成两个文本的视觉差异对比HTML
|
||
|
||
参数:
|
||
text1: 原始文本
|
||
text2: 比较文本
|
||
|
||
返回:
|
||
HTML格式的差异对比
|
||
"""
|
||
# 将文本转换为字符列表
|
||
chars1 = list(text1)
|
||
chars2 = list(text2)
|
||
|
||
# 使用 difflib 比较字符差异
|
||
sequence_matcher = difflib.SequenceMatcher(None, chars1, chars2)
|
||
|
||
# 构建带标记的HTML
|
||
html1 = []
|
||
html2 = []
|
||
|
||
for tag, i1, i2, j1, j2 in sequence_matcher.get_opcodes():
|
||
if tag == 'equal':
|
||
# 相同部分,直接添加
|
||
html1.append(''.join(chars1[i1:i2]))
|
||
html2.append(''.join(chars2[j1:j2]))
|
||
elif tag == 'replace' or tag == 'delete':
|
||
# 删除或替换,标记为红色
|
||
html1.append(f'<span style="background-color:#fdd;">{html.escape("".join(chars1[i1:i2]))}</span>')
|
||
if tag == 'replace':
|
||
# 替换的新内容,标记为绿色
|
||
html2.append(f'<span style="background-color:#dfd;">{html.escape("".join(chars2[j1:j2]))}</span>')
|
||
elif tag == 'insert':
|
||
# 插入部分,在第二个文本中标记为绿色
|
||
html2.append(f'<span style="background-color:#dfd;">{html.escape("".join(chars2[j1:j2]))}</span>')
|
||
|
||
# 合并HTML
|
||
result_html1 = ''.join(html1)
|
||
result_html2 = ''.join(html2)
|
||
|
||
return result_html1, result_html2
|
||
|
||
def generate_visual_diff_report(all_data, output_dir):
|
||
"""
|
||
生成可视化差异报告
|
||
|
||
参数:
|
||
all_data: 包含所有数据的字典
|
||
output_dir: 输出目录
|
||
"""
|
||
logger.info("开始生成可视化差异报告...")
|
||
|
||
# 创建HTML报告
|
||
html_output = []
|
||
html_output.append("""
|
||
<!DOCTYPE html>
|
||
<html>
|
||
<head>
|
||
<meta charset="utf-8">
|
||
<title>内容比较差异报告</title>
|
||
<style>
|
||
body { font-family: Arial, sans-serif; line-height: 1.6; margin: 20px; }
|
||
.container { display: flex; flex-wrap: wrap; }
|
||
.column { flex: 1; min-width: 45%; margin: 10px; border: 1px solid #ddd; padding: 15px; border-radius: 5px; }
|
||
h1 { color: #333; text-align: center; }
|
||
h2 { color: #444; margin-top: 20px; padding-bottom: 5px; border-bottom: 1px solid #eee; }
|
||
h3 { color: #555; }
|
||
.header { background-color: #f5f5f5; padding: 10px; border-bottom: 1px solid #ddd; font-weight: bold; }
|
||
.content { margin-top: 10px; }
|
||
.diff-table { width: 100%; border-collapse: collapse; }
|
||
.diff-table td { vertical-align: top; padding: 10px; border: 1px solid #ddd; }
|
||
.column-title { font-weight: bold; background-color: #f5f5f5; padding: 5px; text-align: center; }
|
||
.item { margin-bottom: 30px; }
|
||
.no-change { color: #666; font-style: italic; }
|
||
pre { white-space: pre-wrap; word-wrap: break-word; }
|
||
.section-title { font-weight: bold; margin: 20px 0 10px 0; padding: 5px; background-color: #f0f0f0; }
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<h1>内容比较差异报告</h1>
|
||
<p style="text-align: center;">本报告显示原始内容与检测后内容的差异对比。红色背景表示删除的部分,绿色背景表示添加的部分。</p>
|
||
""")
|
||
|
||
# 处理每个文件夹的数据
|
||
for folder_name, data in sorted(all_data.items()):
|
||
if 'origin' in data and 'detect' in data:
|
||
html_output.append(f"<div class='item'>")
|
||
html_output.append(f"<h2>索引: {folder_name}</h2>")
|
||
|
||
# 提取原始文本和检测后文本
|
||
origin_data = data['origin']
|
||
detect_data = data['detect']
|
||
|
||
# 对比标题
|
||
origin_title = origin_data.get('title', '')
|
||
detect_title = detect_data.get('title', '')
|
||
|
||
html_output.append("<div class='section-title'>标题对比</div>")
|
||
html_output.append("<table class='diff-table'>")
|
||
html_output.append("<tr>")
|
||
html_output.append("<td width='50%' class='column-title'>原始标题</td>")
|
||
html_output.append("<td width='50%' class='column-title'>检测后标题</td>")
|
||
html_output.append("</tr>")
|
||
|
||
if origin_title == detect_title:
|
||
html_output.append("<tr>")
|
||
html_output.append(f"<td colspan='2' class='no-change'>{html.escape(origin_title)}<br><small>(无变化)</small></td>")
|
||
html_output.append("</tr>")
|
||
else:
|
||
diff_title1, diff_title2 = generate_visual_diff_html(origin_title, detect_title)
|
||
html_output.append("<tr>")
|
||
html_output.append(f"<td>{diff_title1}</td>")
|
||
html_output.append(f"<td>{diff_title2}</td>")
|
||
html_output.append("</tr>")
|
||
|
||
html_output.append("</table>")
|
||
|
||
# 对比内容
|
||
origin_content = origin_data.get('content', '')
|
||
detect_content = detect_data.get('content', '')
|
||
|
||
html_output.append("<div class='section-title'>内容对比</div>")
|
||
html_output.append("<table class='diff-table'>")
|
||
html_output.append("<tr>")
|
||
html_output.append("<td width='50%' class='column-title'>原始内容</td>")
|
||
html_output.append("<td width='50%' class='column-title'>检测后内容</td>")
|
||
html_output.append("</tr>")
|
||
|
||
if origin_content == detect_content:
|
||
html_output.append("<tr>")
|
||
html_output.append(f"<td colspan='2' class='no-change'><pre>{html.escape(origin_content)}</pre><br><small>(无变化)</small></td>")
|
||
html_output.append("</tr>")
|
||
else:
|
||
diff_content1, diff_content2 = generate_visual_diff_html(origin_content, detect_content)
|
||
html_output.append("<tr>")
|
||
html_output.append(f"<td><pre>{diff_content1}</pre></td>")
|
||
html_output.append(f"<td><pre>{diff_content2}</pre></td>")
|
||
html_output.append("</tr>")
|
||
|
||
html_output.append("</table>")
|
||
html_output.append("</div>")
|
||
html_output.append("<hr>")
|
||
|
||
html_output.append("</body></html>")
|
||
|
||
# 保存HTML报告
|
||
report_file = os.path.join(output_dir, "visual_diff_report.html")
|
||
with open(report_file, 'w', encoding='utf-8') as f:
|
||
f.write("\n".join(html_output))
|
||
|
||
logger.info(f"可视化差异报告已保存到: {report_file}")
|
||
return report_file
|
||
|
||
if __name__ == "__main__":
|
||
# 设置基础目录
|
||
base_directory = "/root/autodl-tmp/Content_detector/齐云山"
|
||
|
||
# 确保基础目录存在
|
||
if not os.path.exists(base_directory):
|
||
logger.error(f"目录不存在: {base_directory}")
|
||
else:
|
||
# 创建对比文件
|
||
origin_file, detect_file, merged_file = create_comparison_files(base_directory)
|
||
logger.info("比较文件创建完成!")
|