265 lines
12 KiB
Python
265 lines
12 KiB
Python
|
|
import re
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TopicParser:
|
|||
|
|
"""选题解析器类,负责解析和处理选题"""
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def parse_line(line):
|
|||
|
|
"""解析单个选题内容,提取各个标签中的信息"""
|
|||
|
|
print("正在解析选题:")
|
|||
|
|
print(line)
|
|||
|
|
print("--------------------------------")
|
|||
|
|
try:
|
|||
|
|
# 清理文本,删除多余空格和换行
|
|||
|
|
line = line.strip()
|
|||
|
|
# 如果内容为空或太短则跳过
|
|||
|
|
if len(line) < 10:
|
|||
|
|
return {"error": True}
|
|||
|
|
|
|||
|
|
# 提取各个标签中的内容
|
|||
|
|
index = TopicParser.extract_tag_content(line, "index")
|
|||
|
|
date = TopicParser.extract_tag_content(line, "date")
|
|||
|
|
logic = TopicParser.extract_tag_content(line, "logic")
|
|||
|
|
object = TopicParser.extract_tag_content(line, "object")
|
|||
|
|
product = TopicParser.extract_tag_content(line, "product")
|
|||
|
|
product_logic = TopicParser.extract_tag_content(line, "product_logic")
|
|||
|
|
style = TopicParser.extract_tag_content(line, "style")
|
|||
|
|
style_logic = TopicParser.extract_tag_content(line, "style_logic")
|
|||
|
|
target_audience = TopicParser.extract_tag_content(line, "target_audience")
|
|||
|
|
target_audience_logic = TopicParser.extract_tag_content(line, "target_audience_logic")
|
|||
|
|
|
|||
|
|
# 验证所有必要字段是否存在
|
|||
|
|
if not all([date, logic, product, product_logic, style, style_logic]):
|
|||
|
|
print(f"缺少必要字段: date={bool(date)}, logic={bool(logic)}, product={bool(product)}, product_logic={bool(product_logic)}, style={bool(style)}, style_logic={bool(style_logic)}")
|
|||
|
|
return {"error": True}
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"index": index,
|
|||
|
|
"date": date,
|
|||
|
|
"logic": logic,
|
|||
|
|
"object": object,
|
|||
|
|
"product": product,
|
|||
|
|
"product_logic": product_logic,
|
|||
|
|
"style": style,
|
|||
|
|
"style_logic": style_logic,
|
|||
|
|
"target_audience": target_audience,
|
|||
|
|
"target_audience_logic": target_audience_logic,
|
|||
|
|
"error": False
|
|||
|
|
}
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"解析选题时出错: {e}")
|
|||
|
|
return {"index": "", "date": "", "logic": "", "object": "", "product": "", "product_logic": "", "style": "", "style_logic": "", "target_audience": "", "target_audience_logic": "", "error": True}
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def extract_tag_content(text, tag_name):
|
|||
|
|
"""从文本中提取指定标签的内容"""
|
|||
|
|
try:
|
|||
|
|
start_tag = f"<{tag_name}>"
|
|||
|
|
end_tag = f"</{tag_name}>"
|
|||
|
|
|
|||
|
|
# 使用正则表达式找到所有匹配项,避免嵌套标签的干扰
|
|||
|
|
pattern = f"{start_tag}(.*?){end_tag}"
|
|||
|
|
matches = re.findall(pattern, text, re.DOTALL)
|
|||
|
|
|
|||
|
|
if matches:
|
|||
|
|
# 返回第一个匹配内容并去除前后空白
|
|||
|
|
return matches[0].strip()
|
|||
|
|
|
|||
|
|
# 如果正则匹配失败,尝试普通的字符串查找方法
|
|||
|
|
if start_tag in text and end_tag in text:
|
|||
|
|
start_index = text.index(start_tag) + len(start_tag)
|
|||
|
|
end_index = text.index(end_tag, start_index)
|
|||
|
|
content = text[start_index:end_index].strip()
|
|||
|
|
return content
|
|||
|
|
|
|||
|
|
return ""
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"提取{tag_name}标签内容时出错: {e}")
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def parse_topics(result):
|
|||
|
|
"""解析多个选题,返回解析后的选题列表"""
|
|||
|
|
print("\n开始拆解选题结果...")
|
|||
|
|
|
|||
|
|
# 先尝试用###分割
|
|||
|
|
topics = result.split("###")
|
|||
|
|
|
|||
|
|
# 如果分割后只有一项,可能格式不对,尝试用数字+点的模式分割
|
|||
|
|
if len(topics) <= 2:
|
|||
|
|
print("使用###分割失败,尝试按数字序号分割...")
|
|||
|
|
# 使用正则表达式按数字+点的模式分割
|
|||
|
|
topics = re.split(r'\n\s*\d+\.\s*\n', result)
|
|||
|
|
# 移除第一个可能的空白项
|
|||
|
|
if topics and not topics[0].strip():
|
|||
|
|
topics = topics[1:]
|
|||
|
|
|
|||
|
|
# 如果上面的分割方法都失败,尝试直接按<date>标签分割
|
|||
|
|
if len(topics) <= 2:
|
|||
|
|
print("按数字分割失败,尝试直接按<date>标签分割...")
|
|||
|
|
# 使用正则表达式找到所有<date>标签的位置
|
|||
|
|
date_positions = [m.start() for m in re.finditer(r'<date>', result)]
|
|||
|
|
if len(date_positions) > 1:
|
|||
|
|
topics = []
|
|||
|
|
for i in range(len(date_positions)):
|
|||
|
|
start = date_positions[i]
|
|||
|
|
end = date_positions[i+1] if i+1 < len(date_positions) else len(result)
|
|||
|
|
topic = result[start:end].strip()
|
|||
|
|
if topic:
|
|||
|
|
topics.append(topic)
|
|||
|
|
|
|||
|
|
# 如果上述方法都失败,尝试使用分隔符"---"分割
|
|||
|
|
if len(topics) <= 2:
|
|||
|
|
print("按<date>标签分割失败,尝试使用分隔符'---'分割...")
|
|||
|
|
topics = result.split("---")
|
|||
|
|
# 移除空白项
|
|||
|
|
topics = [topic.strip() for topic in topics if topic.strip()]
|
|||
|
|
print(f"使用分隔符'---'分割后得到 {len(topics)} 个选题")
|
|||
|
|
|
|||
|
|
print(f"初步分割得到 {len(topics)} 个选题")
|
|||
|
|
|
|||
|
|
# 处理每个选题
|
|||
|
|
result_list = []
|
|||
|
|
for i, topic in enumerate(topics):
|
|||
|
|
if not topic.strip():
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理第 {i+1} 个选题")
|
|||
|
|
parsed_data = TopicParser.parse_line(topic)
|
|||
|
|
if parsed_data["error"] != True:
|
|||
|
|
result_list.append(parsed_data)
|
|||
|
|
else:
|
|||
|
|
print(f"选题 {i+1} 解析失败,跳过")
|
|||
|
|
|
|||
|
|
print(f"成功解析选题数量:{len(result_list)}")
|
|||
|
|
|
|||
|
|
# 如果没有解析出选题,尝试另一种方法
|
|||
|
|
if len(result_list) == 0:
|
|||
|
|
print("所有选题解析失败,尝试重新识别选题格式...")
|
|||
|
|
# 寻找所有可能的标签组合
|
|||
|
|
tag_patterns = [
|
|||
|
|
(r'<index>(.*?)</index>', 'index'),
|
|||
|
|
(r'<date>(.*?)</date>', 'date'),
|
|||
|
|
(r'<logic>(.*?)</logic>', 'logic'),
|
|||
|
|
(r'<object>(.*?)</object>', 'object'),
|
|||
|
|
(r'<product>(.*?)</product>', 'product'),
|
|||
|
|
(r'<product_logic>(.*?)</product_logic>', 'product_logic'),
|
|||
|
|
(r'<style>(.*?)</style>', 'style'),
|
|||
|
|
(r'<style_logic>(.*?)</style_logic>', 'style_logic'),
|
|||
|
|
(r'<target_audience>(.*?)</target_audience>', 'target_audience'),
|
|||
|
|
(r'<target_audience_logic>(.*?)</target_audience_logic>', 'target_audience_logic'),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 尝试找出所有date标签,以此确定选题位置
|
|||
|
|
date_matches = re.finditer(r'<date>(.*?)</date>', result)
|
|||
|
|
topics_data = []
|
|||
|
|
|
|||
|
|
for date_match in date_matches:
|
|||
|
|
date_start = date_match.start()
|
|||
|
|
date_end = date_match.end()
|
|||
|
|
date_value = date_match.group(1)
|
|||
|
|
|
|||
|
|
# 从当前date标签位置开始,寻找下一个date标签或文件结尾
|
|||
|
|
next_date_match = re.search(r'<date>', result[date_end:])
|
|||
|
|
topic_end = date_end + next_date_match.start() if next_date_match else len(result)
|
|||
|
|
|
|||
|
|
# 提取当前选题的文本
|
|||
|
|
topic_text = result[date_start:topic_end]
|
|||
|
|
|
|||
|
|
# 解析选题数据
|
|||
|
|
topic_data = {'date': date_value, 'error': False}
|
|||
|
|
|
|||
|
|
# 提取其他标签的内容
|
|||
|
|
for pattern, key in tag_patterns:
|
|||
|
|
if key == 'date': # 已经提取过date了
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
match = re.search(pattern, topic_text)
|
|||
|
|
if match:
|
|||
|
|
topic_data[key] = match.group(1)
|
|||
|
|
else:
|
|||
|
|
topic_data[key] = ""
|
|||
|
|
|
|||
|
|
# 检查是否有基本必要字段
|
|||
|
|
required_fields = ['date', 'logic', 'product', 'product_logic', 'style', 'style_logic']
|
|||
|
|
if all(topic_data.get(field, "") for field in required_fields):
|
|||
|
|
topics_data.append(topic_data)
|
|||
|
|
print(f"成功解析一个选题: {topic_data['date']}")
|
|||
|
|
else:
|
|||
|
|
print(f"选题缺少必要字段,跳过")
|
|||
|
|
|
|||
|
|
result_list = topics_data
|
|||
|
|
print(f"通过标签匹配,成功解析选题数量:{len(result_list)}")
|
|||
|
|
|
|||
|
|
print(f"最终成功解析选题数量:{len(result_list)}")
|
|||
|
|
|
|||
|
|
# 重新分配所有选题的index,确保唯一性和正确的格式
|
|||
|
|
used_indices = set()
|
|||
|
|
|
|||
|
|
# 第一步:尝试解析已有的index,如果是有效数字则保留
|
|||
|
|
for item in result_list:
|
|||
|
|
if 'index' in item and item['index']:
|
|||
|
|
try:
|
|||
|
|
# 尝试转换为整数
|
|||
|
|
index_value = int(item['index'].strip())
|
|||
|
|
# 如果已被使用,标记为None以便后续重新分配
|
|||
|
|
if index_value in used_indices:
|
|||
|
|
item['index'] = None
|
|||
|
|
else:
|
|||
|
|
item['index'] = str(index_value)
|
|||
|
|
used_indices.add(index_value)
|
|||
|
|
except (ValueError, TypeError):
|
|||
|
|
# 如果转换失败,标记为None以便后续重新分配
|
|||
|
|
item['index'] = None
|
|||
|
|
else:
|
|||
|
|
item['index'] = None
|
|||
|
|
|
|||
|
|
# 第二步:为所有无效或重复的index分配新值
|
|||
|
|
next_available_index = 1
|
|||
|
|
for item in result_list:
|
|||
|
|
if item['index'] is None:
|
|||
|
|
# 找到下一个未使用的索引
|
|||
|
|
while next_available_index in used_indices:
|
|||
|
|
next_available_index += 1
|
|||
|
|
item['index'] = str(next_available_index)
|
|||
|
|
used_indices.add(next_available_index)
|
|||
|
|
next_available_index += 1
|
|||
|
|
|
|||
|
|
print(f"所有选题的索引已重新分配,确保唯一性")
|
|||
|
|
return result_list
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def save_topics(result_list, output_dir, run_id, result):
|
|||
|
|
"""保存解析后的选题到JSON文件"""
|
|||
|
|
# 确保输出目录存在
|
|||
|
|
os.makedirs(output_dir, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 保存结果为格式化的JSON
|
|||
|
|
json_path = os.path.join(output_dir, f"result_list_{run_id}.json")
|
|||
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|||
|
|
json.dump(result_list, f, ensure_ascii=False, indent=4)
|
|||
|
|
|
|||
|
|
# 如果找不到任何选题,记录错误
|
|||
|
|
if len(result_list) == 0:
|
|||
|
|
error_log_file = os.path.join(output_dir, f"error_log_{run_id}.txt")
|
|||
|
|
with open(error_log_file, "w", encoding="utf-8") as f:
|
|||
|
|
f.write("无法解析任何选题,原始内容如下:\n\n")
|
|||
|
|
f.write(result)
|
|||
|
|
print(f"选题解析完全失败,已记录错误日志到 {error_log_file}")
|
|||
|
|
return False, None
|
|||
|
|
return True, json_path
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def load_topics_from_json(json_path):
|
|||
|
|
"""从JSON文件加载选题数据"""
|
|||
|
|
try:
|
|||
|
|
with open(json_path, "r", encoding="utf-8") as f:
|
|||
|
|
topics = json.load(f)
|
|||
|
|
print(f"成功从{json_path}加载{len(topics)}个选题")
|
|||
|
|
return topics
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"加载选题数据失败: {e}")
|
|||
|
|
return []
|