import re
import json
import os
import traceback # Import traceback for better error logging
from datetime import datetime
class TopicParser:
"""选题解析器类,负责解析和处理选题"""
@staticmethod
def parse_topics(result, run_id=None):
"""解析 AI 返回的 JSON 格式的选题列表"""
# 如果没有提供run_id,生成一个新的
if run_id is None:
run_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# 保存原始LLM响应
save_dir = os.path.join("result", run_id)
os.makedirs(save_dir, exist_ok=True)
raw_response_path = os.path.join(save_dir, f"raw_llm_response_{run_id}.txt")
try:
with open(raw_response_path, "w", encoding="utf-8") as f:
f.write(result)
print(f"原始LLM响应已保存到:{raw_response_path}")
except Exception as e:
print(f"保存原始LLM响应失败:{e}")
# --- Debug: Print raw input ---
print("--- Raw Input to parse_topics ---")
print(repr(result)) # Use repr() to see special characters like \n
print("--- End Raw Input ---")
# --- End Debug ---
print("\n开始解析 JSON 格式的选题结果...")
result_list = []
try:
# --- Try to remove block first ---
potential_json_part = result # Start with the original result
if "" in result:
parts = result.split("", 1) # Split only once
if len(parts) > 1:
print("检测到并移除了 '' 标签前的部分。")
potential_json_part = parts[1] # Take the part after
# --- End of removal ---
# 尝试直接将结果(或之后的部分)解析为 JSON
# 先移除可能的Markdown代码块标记 ```json ... ```
cleaned_result = re.sub(r'^```json\s*|\s*```$', '', potential_json_part.strip(), flags=re.MULTILINE) # Apply to potential_json_part
if not cleaned_result: # Handle case where everything was removed
print("错误:移除 和/或 markdown 标记后内容为空。")
return []
parsed_json = json.loads(cleaned_result)
# 验证解析结果是否为列表
if isinstance(parsed_json, list):
print(f"成功解析 JSON,包含 {len(parsed_json)} 个潜在选题。")
required_keys = {"index", "date", "logic", "object", "product", "product_logic", "style", "style_logic", "target_audience", "target_audience_logic"}
for i, item in enumerate(parsed_json):
# 验证每个元素是否为字典并包含所有必需的键
if isinstance(item, dict) and required_keys.issubset(item.keys()):
# 确保所有值都是字符串,以防 AI 返回非字符串类型
valid_item = {}
for k, v in item.items():
# 转换为字符串,并检查文件扩展名
str_v = str(v)
# 如果值包含文件扩展名(如.txt, .json, .md等),可能是路径错误
if k in ["style", "target_audience", "product"] and (str_v.endswith('.txt') or
str_v.endswith('.json') or
str_v.endswith('.md')):
print(f"警告:检测到 {k} 字段可能被错误赋值为文件路径:{str_v}")
# 仅保留文件名(不含扩展名)作为值
base_name = os.path.splitext(os.path.basename(str_v))[0]
print(f" 将其修正为:{base_name}")
valid_item[str(k)] = base_name
else:
valid_item[str(k)] = str_v
# 添加 'error' 字段以兼容旧接口(如果需要)
valid_item['error'] = False
result_list.append(valid_item)
else:
print(f"警告: 第 {i+1} 个元素不是有效选题对象或缺少键: {item}")
else:
print(f"错误: 解析结果不是一个 JSON 数组 (List)。实际类型: {type(parsed_json)}")
except json.JSONDecodeError as e:
print(f"错误: 解析 JSON 失败 - {e}")
print("------ 无法解析的原始文本 (After potential removal) ------")
print(potential_json_part) # Print the part we tried to parse
print("-------------------------------")
# 在失败时返回空列表
return []
except Exception as e: # Catch other potential errors
print(f"解析选题时发生意外错误: {e}")
traceback.print_exc()
return []
print(f"最终成功解析选题数量:{len(result_list)}")
# (可选)保留索引重新分配逻辑,处理 AI 可能生成的无效或重复索引
if result_list:
print("重新分配和验证选题索引...")
used_indices = set()
# 第一步:尝试解析已有的index,如果是有效数字则保留
for item in result_list:
raw_index = item.get('index', '')
try:
index_value = int(raw_index.strip())
if index_value in used_indices:
item['index'] = None # Mark for reassignment
else:
item['index'] = str(index_value) # Keep valid, unique index
used_indices.add(index_value)
except (ValueError, TypeError):
item['index'] = None # Mark for reassignment
# 第二步:为所有无效或重复的index分配新值
next_available_index = 1
for item in result_list:
if item.get('index') is None:
while next_available_index in used_indices:
next_available_index += 1
item['index'] = str(next_available_index)
used_indices.add(next_available_index)
next_available_index += 1
print("选题索引已重新分配完毕。")
return result_list
@staticmethod
def save_topics(result_list, output_dir, run_id, result=None): # result is optional now
"""保存解析后的选题到JSON文件"""
os.makedirs(output_dir, exist_ok=True)
json_path = os.path.join(output_dir, f"tweet_topic_{run_id}.json") # Consistent naming with README?
try:
# 使用标准json模块,但添加ensure_ascii=False确保正确处理Unicode
with open(json_path, "w", encoding="utf-8") as f:
json.dump(result_list, f, ensure_ascii=False, indent=4)
print(f"选题结果已保存到: {json_path}")
# 额外创建txt格式的输出
txt_path = os.path.join(output_dir, f"tweet_topic_{run_id}.txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write(f"# 选题列表 (run_id: {run_id})\n\n")
for topic in result_list:
f.write(f"## 选题 {topic.get('index', 'N/A')}\n")
f.write(f"- 日期: {topic.get('date', 'N/A')}\n")
f.write(f"- 对象: {topic.get('object', 'N/A')}\n")
f.write(f"- 产品: {topic.get('product', 'N/A')}\n")
f.write(f"- 产品策略: {topic.get('product_logic', 'N/A')}\n")
f.write(f"- 风格: {topic.get('style', 'N/A')}\n")
f.write(f"- 风格策略: {topic.get('style_logic', 'N/A')}\n")
f.write(f"- 目标受众: {topic.get('target_audience', 'N/A')}\n")
f.write(f"- 受众策略: {topic.get('target_audience_logic', 'N/A')}\n")
f.write(f"- 逻辑: {topic.get('logic', 'N/A')}\n\n")
print(f"选题文本版本已保存到: {txt_path}")
return True, json_path
except Exception as e:
print(f"错误: 保存选题 JSON 文件失败 - {e}")
traceback.print_exc()
# Log raw result if saving fails and result was provided
if result:
error_log_file = os.path.join(output_dir, f"error_log_{run_id}.txt")
try:
with open(error_log_file, "w", encoding="utf-8") as f:
f.write("无法解析或保存选题,原始内容如下:\n\n")
f.write(result)
print(f"原始 AI 输出已记录到: {error_log_file}")
except Exception as log_e:
print(f"错误: 记录原始输出失败 - {log_e}")
return False, None
@staticmethod
def load_topics_from_json(json_path):
"""从JSON文件加载选题列表"""
try:
with open(json_path, "r", encoding="utf-8") as f:
topics_list = json.load(f)
# Basic validation
if isinstance(topics_list, list):
print(f"从 {json_path} 加载了 {len(topics_list)} 个选题。")
return topics_list
else:
print(f"错误: {json_path} 中的内容不是一个有效的 JSON 数组。")
return None
except FileNotFoundError:
print(f"错误: 找不到选题文件 {json_path}")
return None
except json.JSONDecodeError as e:
print(f"错误: 解析选题文件 {json_path} 失败 - {e}")
return None
except Exception as e:
print(f"加载选题文件时发生意外错误: {e}")
traceback.print_exc()
return None