import re import json import os class TopicParser: """选题解析器类,负责解析和处理选题""" @staticmethod def parse_line(line): """解析单个选题内容,提取各个标签中的信息""" print("正在解析选题:") print(line) print("--------------------------------") try: # 清理文本,删除多余空格和换行 line = line.strip() # 如果内容为空或太短则跳过 if len(line) < 10: return {"error": True} # 提取各个标签中的内容 index = TopicParser.extract_tag_content(line, "index") date = TopicParser.extract_tag_content(line, "date") logic = TopicParser.extract_tag_content(line, "logic") object = TopicParser.extract_tag_content(line, "object") product = TopicParser.extract_tag_content(line, "product") product_logic = TopicParser.extract_tag_content(line, "product_logic") style = TopicParser.extract_tag_content(line, "style") style_logic = TopicParser.extract_tag_content(line, "style_logic") target_audience = TopicParser.extract_tag_content(line, "target_audience") target_audience_logic = TopicParser.extract_tag_content(line, "target_audience_logic") # 验证所有必要字段是否存在 if not all([date, logic, product, product_logic, style, style_logic]): print(f"缺少必要字段: date={bool(date)}, logic={bool(logic)}, product={bool(product)}, product_logic={bool(product_logic)}, style={bool(style)}, style_logic={bool(style_logic)}") return {"error": True} return { "index": index, "date": date, "logic": logic, "object": object, "product": product, "product_logic": product_logic, "style": style, "style_logic": style_logic, "target_audience": target_audience, "target_audience_logic": target_audience_logic, "error": False } except Exception as e: print(f"解析选题时出错: {e}") return {"index": "", "date": "", "logic": "", "object": "", "product": "", "product_logic": "", "style": "", "style_logic": "", "target_audience": "", "target_audience_logic": "", "error": True} @staticmethod def extract_tag_content(text, tag_name): """从文本中提取指定标签的内容""" try: start_tag = f"<{tag_name}>" end_tag = f"" # 使用正则表达式找到所有匹配项,避免嵌套标签的干扰 pattern = f"{start_tag}(.*?){end_tag}" matches = re.findall(pattern, text, re.DOTALL) if matches: # 返回第一个匹配内容并去除前后空白 return matches[0].strip() # 如果正则匹配失败,尝试普通的字符串查找方法 if start_tag in text and end_tag in text: start_index = text.index(start_tag) + len(start_tag) end_index = text.index(end_tag, start_index) content = text[start_index:end_index].strip() return content return "" except Exception as e: print(f"提取{tag_name}标签内容时出错: {e}") return "" @staticmethod def parse_topics(result): """解析多个选题,返回解析后的选题列表""" print("\n开始拆解选题结果...") # 先尝试用###分割 topics = result.split("###") # 如果分割后只有一项,可能格式不对,尝试用数字+点的模式分割 if len(topics) <= 2: print("使用###分割失败,尝试按数字序号分割...") # 使用正则表达式按数字+点的模式分割 topics = re.split(r'\n\s*\d+\.\s*\n', result) # 移除第一个可能的空白项 if topics and not topics[0].strip(): topics = topics[1:] # 如果上面的分割方法都失败,尝试直接按标签分割 if len(topics) <= 2: print("按数字分割失败,尝试直接按标签分割...") # 使用正则表达式找到所有标签的位置 date_positions = [m.start() for m in re.finditer(r'', result)] if len(date_positions) > 1: topics = [] for i in range(len(date_positions)): start = date_positions[i] end = date_positions[i+1] if i+1 < len(date_positions) else len(result) topic = result[start:end].strip() if topic: topics.append(topic) # 如果上述方法都失败,尝试使用分隔符"---"分割 if len(topics) <= 2: print("按标签分割失败,尝试使用分隔符'---'分割...") topics = result.split("---") # 移除空白项 topics = [topic.strip() for topic in topics if topic.strip()] print(f"使用分隔符'---'分割后得到 {len(topics)} 个选题") print(f"初步分割得到 {len(topics)} 个选题") # 处理每个选题 result_list = [] for i, topic in enumerate(topics): if not topic.strip(): continue print(f"\n处理第 {i+1} 个选题") parsed_data = TopicParser.parse_line(topic) if parsed_data["error"] != True: result_list.append(parsed_data) else: print(f"选题 {i+1} 解析失败,跳过") print(f"成功解析选题数量:{len(result_list)}") # 如果没有解析出选题,尝试另一种方法 if len(result_list) == 0: print("所有选题解析失败,尝试重新识别选题格式...") # 寻找所有可能的标签组合 tag_patterns = [ (r'(.*?)', 'index'), (r'(.*?)', 'date'), (r'(.*?)', 'logic'), (r'(.*?)', 'object'), (r'(.*?)', 'product'), (r'(.*?)', 'product_logic'), (r'', 'style'), (r'(.*?)', 'style_logic'), (r'(.*?)', 'target_audience'), (r'(.*?)', 'target_audience_logic'), ] # 尝试找出所有date标签,以此确定选题位置 date_matches = re.finditer(r'(.*?)', result) topics_data = [] for date_match in date_matches: date_start = date_match.start() date_end = date_match.end() date_value = date_match.group(1) # 从当前date标签位置开始,寻找下一个date标签或文件结尾 next_date_match = re.search(r'', result[date_end:]) topic_end = date_end + next_date_match.start() if next_date_match else len(result) # 提取当前选题的文本 topic_text = result[date_start:topic_end] # 解析选题数据 topic_data = {'date': date_value, 'error': False} # 提取其他标签的内容 for pattern, key in tag_patterns: if key == 'date': # 已经提取过date了 continue match = re.search(pattern, topic_text) if match: topic_data[key] = match.group(1) else: topic_data[key] = "" # 检查是否有基本必要字段 required_fields = ['date', 'logic', 'product', 'product_logic', 'style', 'style_logic'] if all(topic_data.get(field, "") for field in required_fields): topics_data.append(topic_data) print(f"成功解析一个选题: {topic_data['date']}") else: print(f"选题缺少必要字段,跳过") result_list = topics_data print(f"通过标签匹配,成功解析选题数量:{len(result_list)}") print(f"最终成功解析选题数量:{len(result_list)}") # 重新分配所有选题的index,确保唯一性和正确的格式 used_indices = set() # 第一步:尝试解析已有的index,如果是有效数字则保留 for item in result_list: if 'index' in item and item['index']: try: # 尝试转换为整数 index_value = int(item['index'].strip()) # 如果已被使用,标记为None以便后续重新分配 if index_value in used_indices: item['index'] = None else: item['index'] = str(index_value) used_indices.add(index_value) except (ValueError, TypeError): # 如果转换失败,标记为None以便后续重新分配 item['index'] = None else: item['index'] = None # 第二步:为所有无效或重复的index分配新值 next_available_index = 1 for item in result_list: if item['index'] is None: # 找到下一个未使用的索引 while next_available_index in used_indices: next_available_index += 1 item['index'] = str(next_available_index) used_indices.add(next_available_index) next_available_index += 1 print(f"所有选题的索引已重新分配,确保唯一性") return result_list @staticmethod def save_topics(result_list, output_dir, run_id, result): """保存解析后的选题到JSON文件""" # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 保存结果为格式化的JSON json_path = os.path.join(output_dir, f"result_list_{run_id}.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(result_list, f, ensure_ascii=False, indent=4) # 如果找不到任何选题,记录错误 if len(result_list) == 0: error_log_file = os.path.join(output_dir, f"error_log_{run_id}.txt") with open(error_log_file, "w", encoding="utf-8") as f: f.write("无法解析任何选题,原始内容如下:\n\n") f.write(result) print(f"选题解析完全失败,已记录错误日志到 {error_log_file}") return False, None return True, json_path @staticmethod def load_topics_from_json(json_path): """从JSON文件加载选题数据""" try: with open(json_path, "r", encoding="utf-8") as f: topics = json.load(f) print(f"成功从{json_path}加载{len(topics)}个选题") return topics except Exception as e: print(f"加载选题数据失败: {e}") return []