TravelContentCreator/core/topic_parser.py

265 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import json
import os
class TopicParser:
"""选题解析器类,负责解析和处理选题"""
@staticmethod
def parse_line(line):
"""解析单个选题内容,提取各个标签中的信息"""
print("正在解析选题:")
print(line)
print("--------------------------------")
try:
# 清理文本,删除多余空格和换行
line = line.strip()
# 如果内容为空或太短则跳过
if len(line) < 10:
return {"error": True}
# 提取各个标签中的内容
index = TopicParser.extract_tag_content(line, "index")
date = TopicParser.extract_tag_content(line, "date")
logic = TopicParser.extract_tag_content(line, "logic")
object = TopicParser.extract_tag_content(line, "object")
product = TopicParser.extract_tag_content(line, "product")
product_logic = TopicParser.extract_tag_content(line, "product_logic")
style = TopicParser.extract_tag_content(line, "style")
style_logic = TopicParser.extract_tag_content(line, "style_logic")
target_audience = TopicParser.extract_tag_content(line, "target_audience")
target_audience_logic = TopicParser.extract_tag_content(line, "target_audience_logic")
# 验证所有必要字段是否存在
if not all([date, logic, product, product_logic, style, style_logic]):
print(f"缺少必要字段: date={bool(date)}, logic={bool(logic)}, product={bool(product)}, product_logic={bool(product_logic)}, style={bool(style)}, style_logic={bool(style_logic)}")
return {"error": True}
return {
"index": index,
"date": date,
"logic": logic,
"object": object,
"product": product,
"product_logic": product_logic,
"style": style,
"style_logic": style_logic,
"target_audience": target_audience,
"target_audience_logic": target_audience_logic,
"error": False
}
except Exception as e:
print(f"解析选题时出错: {e}")
return {"index": "", "date": "", "logic": "", "object": "", "product": "", "product_logic": "", "style": "", "style_logic": "", "target_audience": "", "target_audience_logic": "", "error": True}
@staticmethod
def extract_tag_content(text, tag_name):
"""从文本中提取指定标签的内容"""
try:
start_tag = f"<{tag_name}>"
end_tag = f"</{tag_name}>"
# 使用正则表达式找到所有匹配项,避免嵌套标签的干扰
pattern = f"{start_tag}(.*?){end_tag}"
matches = re.findall(pattern, text, re.DOTALL)
if matches:
# 返回第一个匹配内容并去除前后空白
return matches[0].strip()
# 如果正则匹配失败,尝试普通的字符串查找方法
if start_tag in text and end_tag in text:
start_index = text.index(start_tag) + len(start_tag)
end_index = text.index(end_tag, start_index)
content = text[start_index:end_index].strip()
return content
return ""
except Exception as e:
print(f"提取{tag_name}标签内容时出错: {e}")
return ""
@staticmethod
def parse_topics(result):
"""解析多个选题,返回解析后的选题列表"""
print("\n开始拆解选题结果...")
# 先尝试用###分割
topics = result.split("###")
# 如果分割后只有一项,可能格式不对,尝试用数字+点的模式分割
if len(topics) <= 2:
print("使用###分割失败,尝试按数字序号分割...")
# 使用正则表达式按数字+点的模式分割
topics = re.split(r'\n\s*\d+\.\s*\n', result)
# 移除第一个可能的空白项
if topics and not topics[0].strip():
topics = topics[1:]
# 如果上面的分割方法都失败,尝试直接按<date>标签分割
if len(topics) <= 2:
print("按数字分割失败,尝试直接按<date>标签分割...")
# 使用正则表达式找到所有<date>标签的位置
date_positions = [m.start() for m in re.finditer(r'<date>', result)]
if len(date_positions) > 1:
topics = []
for i in range(len(date_positions)):
start = date_positions[i]
end = date_positions[i+1] if i+1 < len(date_positions) else len(result)
topic = result[start:end].strip()
if topic:
topics.append(topic)
# 如果上述方法都失败,尝试使用分隔符"---"分割
if len(topics) <= 2:
print("按<date>标签分割失败,尝试使用分隔符'---'分割...")
topics = result.split("---")
# 移除空白项
topics = [topic.strip() for topic in topics if topic.strip()]
print(f"使用分隔符'---'分割后得到 {len(topics)} 个选题")
print(f"初步分割得到 {len(topics)} 个选题")
# 处理每个选题
result_list = []
for i, topic in enumerate(topics):
if not topic.strip():
continue
print(f"\n处理第 {i+1} 个选题")
parsed_data = TopicParser.parse_line(topic)
if parsed_data["error"] != True:
result_list.append(parsed_data)
else:
print(f"选题 {i+1} 解析失败,跳过")
print(f"成功解析选题数量:{len(result_list)}")
# 如果没有解析出选题,尝试另一种方法
if len(result_list) == 0:
print("所有选题解析失败,尝试重新识别选题格式...")
# 寻找所有可能的标签组合
tag_patterns = [
(r'<index>(.*?)</index>', 'index'),
(r'<date>(.*?)</date>', 'date'),
(r'<logic>(.*?)</logic>', 'logic'),
(r'<object>(.*?)</object>', 'object'),
(r'<product>(.*?)</product>', 'product'),
(r'<product_logic>(.*?)</product_logic>', 'product_logic'),
(r'<style>(.*?)</style>', 'style'),
(r'<style_logic>(.*?)</style_logic>', 'style_logic'),
(r'<target_audience>(.*?)</target_audience>', 'target_audience'),
(r'<target_audience_logic>(.*?)</target_audience_logic>', 'target_audience_logic'),
]
# 尝试找出所有date标签以此确定选题位置
date_matches = re.finditer(r'<date>(.*?)</date>', result)
topics_data = []
for date_match in date_matches:
date_start = date_match.start()
date_end = date_match.end()
date_value = date_match.group(1)
# 从当前date标签位置开始寻找下一个date标签或文件结尾
next_date_match = re.search(r'<date>', result[date_end:])
topic_end = date_end + next_date_match.start() if next_date_match else len(result)
# 提取当前选题的文本
topic_text = result[date_start:topic_end]
# 解析选题数据
topic_data = {'date': date_value, 'error': False}
# 提取其他标签的内容
for pattern, key in tag_patterns:
if key == 'date': # 已经提取过date了
continue
match = re.search(pattern, topic_text)
if match:
topic_data[key] = match.group(1)
else:
topic_data[key] = ""
# 检查是否有基本必要字段
required_fields = ['date', 'logic', 'product', 'product_logic', 'style', 'style_logic']
if all(topic_data.get(field, "") for field in required_fields):
topics_data.append(topic_data)
print(f"成功解析一个选题: {topic_data['date']}")
else:
print(f"选题缺少必要字段,跳过")
result_list = topics_data
print(f"通过标签匹配,成功解析选题数量:{len(result_list)}")
print(f"最终成功解析选题数量:{len(result_list)}")
# 重新分配所有选题的index确保唯一性和正确的格式
used_indices = set()
# 第一步尝试解析已有的index如果是有效数字则保留
for item in result_list:
if 'index' in item and item['index']:
try:
# 尝试转换为整数
index_value = int(item['index'].strip())
# 如果已被使用标记为None以便后续重新分配
if index_value in used_indices:
item['index'] = None
else:
item['index'] = str(index_value)
used_indices.add(index_value)
except (ValueError, TypeError):
# 如果转换失败标记为None以便后续重新分配
item['index'] = None
else:
item['index'] = None
# 第二步为所有无效或重复的index分配新值
next_available_index = 1
for item in result_list:
if item['index'] is None:
# 找到下一个未使用的索引
while next_available_index in used_indices:
next_available_index += 1
item['index'] = str(next_available_index)
used_indices.add(next_available_index)
next_available_index += 1
print(f"所有选题的索引已重新分配,确保唯一性")
return result_list
@staticmethod
def save_topics(result_list, output_dir, run_id, result):
"""保存解析后的选题到JSON文件"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 保存结果为格式化的JSON
json_path = os.path.join(output_dir, f"result_list_{run_id}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(result_list, f, ensure_ascii=False, indent=4)
# 如果找不到任何选题,记录错误
if len(result_list) == 0:
error_log_file = os.path.join(output_dir, f"error_log_{run_id}.txt")
with open(error_log_file, "w", encoding="utf-8") as f:
f.write("无法解析任何选题,原始内容如下:\n\n")
f.write(result)
print(f"选题解析完全失败,已记录错误日志到 {error_log_file}")
return False, None
return True, json_path
@staticmethod
def load_topics_from_json(json_path):
"""从JSON文件加载选题数据"""
try:
with open(json_path, "r", encoding="utf-8") as f:
topics = json.load(f)
print(f"成功从{json_path}加载{len(topics)}个选题")
return topics
except Exception as e:
print(f"加载选题数据失败: {e}")
return []