201 lines
10 KiB
Python
201 lines
10 KiB
Python
import re
|
||
import json
|
||
import os
|
||
import traceback # Import traceback for better error logging
|
||
from datetime import datetime
|
||
|
||
|
||
class TopicParser:
|
||
"""选题解析器类,负责解析和处理选题"""
|
||
|
||
@staticmethod
|
||
def parse_topics(result, run_id=None):
|
||
"""解析 AI 返回的 JSON 格式的选题列表"""
|
||
# 如果没有提供run_id,生成一个新的
|
||
if run_id is None:
|
||
run_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||
|
||
# 保存原始LLM响应
|
||
save_dir = os.path.join("result", run_id)
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
raw_response_path = os.path.join(save_dir, f"raw_llm_response_{run_id}.txt")
|
||
try:
|
||
with open(raw_response_path, "w", encoding="utf-8") as f:
|
||
f.write(result)
|
||
print(f"原始LLM响应已保存到:{raw_response_path}")
|
||
except Exception as e:
|
||
print(f"保存原始LLM响应失败:{e}")
|
||
|
||
# --- Debug: Print raw input ---
|
||
print("--- Raw Input to parse_topics ---")
|
||
print(repr(result)) # Use repr() to see special characters like \n
|
||
print("--- End Raw Input ---")
|
||
# --- End Debug ---
|
||
|
||
print("\n开始解析 JSON 格式的选题结果...")
|
||
result_list = []
|
||
try:
|
||
# --- Try to remove <think> block first ---
|
||
potential_json_part = result # Start with the original result
|
||
if "</think>" in result:
|
||
parts = result.split("</think>", 1) # Split only once
|
||
if len(parts) > 1:
|
||
print("检测到并移除了 '</think>' 标签前的部分。")
|
||
potential_json_part = parts[1] # Take the part after </think>
|
||
# --- End of <think> removal ---
|
||
|
||
# 尝试直接将结果(或</think>之后的部分)解析为 JSON
|
||
# 先移除可能的Markdown代码块标记 ```json ... ```
|
||
cleaned_result = re.sub(r'^```json\s*|\s*```$', '', potential_json_part.strip(), flags=re.MULTILINE) # Apply to potential_json_part
|
||
|
||
if not cleaned_result: # Handle case where everything was removed
|
||
print("错误:移除 <think> 和/或 markdown 标记后内容为空。")
|
||
return []
|
||
|
||
parsed_json = json.loads(cleaned_result)
|
||
|
||
# 验证解析结果是否为列表
|
||
if isinstance(parsed_json, list):
|
||
print(f"成功解析 JSON,包含 {len(parsed_json)} 个潜在选题。")
|
||
required_keys = {"index", "date", "logic", "object", "product", "product_logic", "style", "style_logic", "target_audience", "target_audience_logic"}
|
||
|
||
for i, item in enumerate(parsed_json):
|
||
# 验证每个元素是否为字典并包含所有必需的键
|
||
if isinstance(item, dict) and required_keys.issubset(item.keys()):
|
||
# 确保所有值都是字符串,以防 AI 返回非字符串类型
|
||
valid_item = {}
|
||
for k, v in item.items():
|
||
# 转换为字符串,并检查文件扩展名
|
||
str_v = str(v)
|
||
# 如果值包含文件扩展名(如.txt, .json, .md等),可能是路径错误
|
||
if k in ["style", "target_audience", "product"] and (str_v.endswith('.txt') or
|
||
str_v.endswith('.json') or
|
||
str_v.endswith('.md')):
|
||
print(f"警告:检测到 {k} 字段可能被错误赋值为文件路径:{str_v}")
|
||
# 仅保留文件名(不含扩展名)作为值
|
||
base_name = os.path.splitext(os.path.basename(str_v))[0]
|
||
print(f" 将其修正为:{base_name}")
|
||
valid_item[str(k)] = base_name
|
||
else:
|
||
valid_item[str(k)] = str_v
|
||
|
||
# 添加 'error' 字段以兼容旧接口(如果需要)
|
||
valid_item['error'] = False
|
||
result_list.append(valid_item)
|
||
else:
|
||
print(f"警告: 第 {i+1} 个元素不是有效选题对象或缺少键: {item}")
|
||
|
||
else:
|
||
print(f"错误: 解析结果不是一个 JSON 数组 (List)。实际类型: {type(parsed_json)}")
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f"错误: 解析 JSON 失败 - {e}")
|
||
print("------ 无法解析的原始文本 (After potential <think> removal) ------")
|
||
print(potential_json_part) # Print the part we tried to parse
|
||
print("-------------------------------")
|
||
# 在失败时返回空列表
|
||
return []
|
||
except Exception as e: # Catch other potential errors
|
||
print(f"解析选题时发生意外错误: {e}")
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
print(f"最终成功解析选题数量:{len(result_list)}")
|
||
|
||
# (可选)保留索引重新分配逻辑,处理 AI 可能生成的无效或重复索引
|
||
if result_list:
|
||
print("重新分配和验证选题索引...")
|
||
used_indices = set()
|
||
# 第一步:尝试解析已有的index,如果是有效数字则保留
|
||
for item in result_list:
|
||
raw_index = item.get('index', '')
|
||
try:
|
||
index_value = int(raw_index.strip())
|
||
if index_value in used_indices:
|
||
item['index'] = None # Mark for reassignment
|
||
else:
|
||
item['index'] = str(index_value) # Keep valid, unique index
|
||
used_indices.add(index_value)
|
||
except (ValueError, TypeError):
|
||
item['index'] = None # Mark for reassignment
|
||
|
||
# 第二步:为所有无效或重复的index分配新值
|
||
next_available_index = 1
|
||
for item in result_list:
|
||
if item.get('index') is None:
|
||
while next_available_index in used_indices:
|
||
next_available_index += 1
|
||
item['index'] = str(next_available_index)
|
||
used_indices.add(next_available_index)
|
||
next_available_index += 1
|
||
print("选题索引已重新分配完毕。")
|
||
|
||
return result_list
|
||
|
||
@staticmethod
|
||
def save_topics(result_list, output_dir, run_id, result=None): # result is optional now
|
||
"""保存解析后的选题到JSON文件"""
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
json_path = os.path.join(output_dir, f"tweet_topic_{run_id}.json") # Consistent naming with README?
|
||
try:
|
||
# 使用标准json模块,但添加ensure_ascii=False确保正确处理Unicode
|
||
with open(json_path, "w", encoding="utf-8") as f:
|
||
json.dump(result_list, f, ensure_ascii=False, indent=4)
|
||
print(f"选题结果已保存到: {json_path}")
|
||
|
||
# 额外创建txt格式的输出
|
||
txt_path = os.path.join(output_dir, f"tweet_topic_{run_id}.txt")
|
||
with open(txt_path, "w", encoding="utf-8") as f:
|
||
f.write(f"# 选题列表 (run_id: {run_id})\n\n")
|
||
for topic in result_list:
|
||
f.write(f"## 选题 {topic.get('index', 'N/A')}\n")
|
||
f.write(f"- 日期: {topic.get('date', 'N/A')}\n")
|
||
f.write(f"- 对象: {topic.get('object', 'N/A')}\n")
|
||
f.write(f"- 产品: {topic.get('product', 'N/A')}\n")
|
||
f.write(f"- 产品策略: {topic.get('product_logic', 'N/A')}\n")
|
||
f.write(f"- 风格: {topic.get('style', 'N/A')}\n")
|
||
f.write(f"- 风格策略: {topic.get('style_logic', 'N/A')}\n")
|
||
f.write(f"- 目标受众: {topic.get('target_audience', 'N/A')}\n")
|
||
f.write(f"- 受众策略: {topic.get('target_audience_logic', 'N/A')}\n")
|
||
f.write(f"- 逻辑: {topic.get('logic', 'N/A')}\n\n")
|
||
print(f"选题文本版本已保存到: {txt_path}")
|
||
|
||
return True, json_path
|
||
except Exception as e:
|
||
print(f"错误: 保存选题 JSON 文件失败 - {e}")
|
||
traceback.print_exc()
|
||
# Log raw result if saving fails and result was provided
|
||
if result:
|
||
error_log_file = os.path.join(output_dir, f"error_log_{run_id}.txt")
|
||
try:
|
||
with open(error_log_file, "w", encoding="utf-8") as f:
|
||
f.write("无法解析或保存选题,原始内容如下:\n\n")
|
||
f.write(result)
|
||
print(f"原始 AI 输出已记录到: {error_log_file}")
|
||
except Exception as log_e:
|
||
print(f"错误: 记录原始输出失败 - {log_e}")
|
||
return False, None
|
||
|
||
@staticmethod
|
||
def load_topics_from_json(json_path):
|
||
"""从JSON文件加载选题列表"""
|
||
try:
|
||
with open(json_path, "r", encoding="utf-8") as f:
|
||
topics_list = json.load(f)
|
||
# Basic validation
|
||
if isinstance(topics_list, list):
|
||
print(f"从 {json_path} 加载了 {len(topics_list)} 个选题。")
|
||
return topics_list
|
||
else:
|
||
print(f"错误: {json_path} 中的内容不是一个有效的 JSON 数组。")
|
||
return None
|
||
except FileNotFoundError:
|
||
print(f"错误: 找不到选题文件 {json_path}")
|
||
return None
|
||
except json.JSONDecodeError as e:
|
||
print(f"错误: 解析选题文件 {json_path} 失败 - {e}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"加载选题文件时发生意外错误: {e}")
|
||
traceback.print_exc()
|
||
return None |