Compare commits

...

3 Commits

Author SHA1 Message Date
157d3348a6 修复了judeger的响应问题 2025-07-10 16:15:13 +08:00
ae6801a7d5 更改了json提取的模块,增加了公用共享模块 2025-07-10 16:10:14 +08:00
8feefeb053 require 2025-07-10 14:57:05 +08:00
41 changed files with 178 additions and 122 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -99,11 +99,8 @@ class AIAgent:
time_cost = time.time() - start_time
if use_stream:
# 流式处理暂时不返回token计数和时间需要更复杂的实现
# 这里返回一个空的生成器,但实际逻辑在 _process_stream 中
# 为了统一返回类型,我们可能需要重新设计这里
# 简化处理:流式模式下,我们返回拼接后的完整文本
full_text = "".join([chunk for chunk in self._process_stream(response)])
# 流式处理需要异步迭代
full_text = await self._process_stream(response)
output_tokens = self.count_tokens(full_text)
logger.info(f"{stage_info} 任务完成,耗时 {time_cost:.2f} 秒. 输出token数: {output_tokens}")
return full_text, input_tokens, output_tokens, time_cost
@ -130,16 +127,18 @@ class AIAgent:
raise AIModelError(f"AI模型调用在 {self.config.max_retries} 次重试后失败") from last_exception
def _process_stream(self, response):
"""处理流式响应"""
async def _process_stream(self, response):
"""异步处理流式响应"""
full_response = []
for chunk in response:
async for chunk in response:
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
yield content
# 如果需要在这里实现真正的流式处理,可以使用回调函数或其他方式
logger.info(f"流式响应接收完成,总长度: {len(''.join(full_response))}")
full_text = "".join(full_response)
logger.info(f"流式响应接收完成,总长度: {len(full_text)}")
return full_text
def count_tokens(self, text: str) -> int:
"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -222,7 +222,7 @@ async def generate_content_for_topic(self, topic: Dict[str, Any]) -> Dict[str, A
raw_result = await self.ai_agent.generate_text(...)
# 解析和保存结果
content_data = json.loads(raw_result)
content_data = json_repair.loads(raw_result)
self.output_manager.save_json(content_data, "article.json", ...)
return content_data

View File

@ -1,7 +1,12 @@
fastapi==0.115.12
numpy==2.2.5
openai==1.77.0
Pillow==11.2.1
json_repair==0.47.6
numpy==2.3.1
openai==1.93.3
opencv_python==4.11.0.86
opencv_python_headless==4.11.0.86
Pillow==11.3.0
psutil==6.1.0
pydantic==2.11.7
scikit_learn==1.7.0
scipy==1.16.0
simplejson==3.20.1
tiktoken==0.9.0
uvicorn==0.34.2
opencv-python

View File

@ -8,13 +8,13 @@
import os
import sys
import json
import json_repair
import random
import asyncio
import argparse
import logging
import re
from datetime import datetime
# 将项目根目录添加到Python路径中
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -34,7 +34,7 @@ def read_data_file(file_path: str):
# 尝试解析为标准JSON
try:
return json.loads(content)
return json_repair.loads(content)
except json.JSONDecodeError:
# 如果不是标准JSON尝试解析为类JSON格式
logger.info(f"文件 {file_path} 不是标准JSON格式尝试其他解析方式...")
@ -220,7 +220,7 @@ async def generate_info_with_llm(ai_agent: AIAgent, scenic_info: dict, product_i
if json_start >= 0 and json_end > json_start:
json_str = response[json_start:json_end]
content_dict = json.loads(json_str)
content_dict = json_repair.loads(json_str)
logger.info(f"LLM成功生成内容: {content_dict}")
# 添加默认的按钮文本和分页信息

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -8,6 +8,7 @@
import os
import random
import json
import json_repair
import logging
from pathlib import Path
from typing import Optional, List, Dict, Any
@ -38,7 +39,7 @@ class ResourceLoader:
if content is None:
return None
try:
return json.loads(content)
return json_repair.loads(content)
except json.JSONDecodeError as e:
logger.error(f"解析JSON文件 '{file_path}' 失败: {e}")
return None
@ -133,4 +134,87 @@ class OutputManager:
"""完成运行的最终操作"""
logger.info(f"Finalizing run: {self.run_id}")
# 目前没有特殊操作,但可以用于未来的扩展,如创建清单文件
pass
pass
def process_llm_json_text(text: Any) -> Optional[Dict[str, Any]]:
"""
处理LLM返回的JSON字符串支持多种格式提取
1. 提取</think>后的内容
2. 提取```json和```之间的内容
3. 尝试直接解析整个文本
4. 使用json_repair修复格式问题
Args:
text: LLM返回的原始文本或已解析的对象
Returns:
解析后的JSON对象解析失败则返回None
"""
# 如果输入已经是字典类型,直接返回
if isinstance(text, dict):
return text
# 如果输入是列表类型且要求返回字典则返回None
if isinstance(text, list):
logger.warning("输入是列表类型,但期望返回字典类型")
return None
# 确保输入是字符串类型
if not isinstance(text, str):
try:
text = str(text)
except Exception as e:
logger.error(f"无法将输入转换为字符串: {e}")
return None
if not text or not text.strip():
logger.warning("收到空的LLM响应")
return None
# 存储可能的JSON文本
json_candidates = []
# 1. 尝试提取</think>后的内容
if "</think>" in text:
think_parts = text.split("</think>", 1)
if len(think_parts) > 1:
json_candidates.append(think_parts[1].strip())
# 2. 尝试提取```json和```之间的内容
json_code_blocks = []
# 匹配```json和```之间的内容
import re
json_blocks = re.findall(r"```(?:json)?\s*([\s\S]*?)```", text)
if json_blocks:
json_candidates.extend([block.strip() for block in json_blocks])
# 3. 直接使用json_repair解析
try:
return json_repair.loads(text)
except Exception:
pass
# 4. 添加原始文本作为候选
json_candidates.append(text.strip())
# 尝试解析每个候选文本
for candidate in json_candidates:
# 直接尝试解析
try:
import json
return json.loads(candidate)
except json.JSONDecodeError:
pass
# 使用json_repair尝试修复
try:
import json_repair
return json_repair.loads(candidate)
except Exception:
continue
# 所有尝试都失败记录错误并返回None
logger.error(f"无法解析LLM返回的JSON尝试了{len(json_candidates)}种提取方式")
logger.debug(f"原始响应: {text[:200]}...") # 只记录前200个字符避免日志过大
return None

View File

@ -9,7 +9,7 @@ import json
from typing import Dict, Any, Optional, List
from core.ai import AIAgent
from utils.file_io import ResourceLoader, process_llm_json_text
logger = logging.getLogger(__name__)
class PosterContentGenerator:
@ -73,28 +73,9 @@ class PosterContentGenerator:
self.logger.error("AI未能返回任何内容。")
return None
# 预处理并解析JSON
return self._parse_json_response(raw_response)
# 使用通用JSON解析函数处理响应
return process_llm_json_text(raw_response)
except Exception as e:
self.logger.error(f"调用AI生成文案时发生严重错误: {e}", exc_info=True)
return None
def _parse_json_response(self, text: str) -> Optional[Dict[str, Any]]:
"""
从AI返回的文本中提取并解析JSON
"""
try:
# 找到第一个 '{' 和最后一个 '}' 来提取JSON字符串
start_index = text.find('{')
end_index = text.rfind('}')
if start_index != -1 and end_index != -1:
json_str = text[start_index : end_index + 1]
return json.loads(json_str)
else:
self.logger.error("在AI响应中未找到有效的JSON对象。")
return None
except json.JSONDecodeError as e:
self.logger.error(f"解析AI返回的JSON失败: {e}")
self.logger.debug(f"原始响应文本: {text}")
return None

Binary file not shown.

Binary file not shown.

View File

@ -12,7 +12,7 @@ from typing import Dict, Any
from core.ai import AIAgent
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
from utils.prompts import ContentPromptBuilder
from utils.file_io import OutputManager
from utils.file_io import OutputManager, process_llm_json_text
logger = logging.getLogger(__name__)
@ -67,7 +67,7 @@ class ContentGenerator:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=False,
use_stream=True,
stage="内容生成",
**model_params
)
@ -77,12 +77,11 @@ class ContentGenerator:
return {"error": str(e)}
# 3. 解析和保存结果
try:
# 假设结果是一个JSON字符串
content_data = json.loads(raw_result)
content_data = process_llm_json_text(raw_result)
if content_data:
self.output_manager.save_json(content_data, "article.json", subdir=output_dir.name)
logger.info(f"成功为选题 {topic_index} 生成并保存内容。")
return content_data
except json.JSONDecodeError as e:
logger.error(f"解析内容JSON失败 for {topic_index}: {e}")
else:
logger.error(f"解析内容JSON失败 for {topic_index}")
return {"error": "JSONDecodeError", "raw_content": raw_result}

View File

@ -7,12 +7,12 @@
import logging
import json
from json_repair import loads as json_repair_loads
from typing import Dict, Any
from typing import Dict, Any, Union
from core.ai import AIAgent
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
from utils.prompts import JudgerPromptBuilder
from utils.file_io import process_llm_json_text
logger = logging.getLogger(__name__)
@ -36,12 +36,12 @@ class ContentJudger:
self.prompt_builder = JudgerPromptBuilder(config_manager)
self.output_manager = output_manager
async def judge_content(self, generated_content: str, topic: Dict[str, Any]) -> Dict[str, Any]:
async def judge_content(self, generated_content: Union[str, Dict[str, Any]], topic: Dict[str, Any]) -> Dict[str, Any]:
"""
调用AI审核生成的内容
Args:
generated_content: 已生成的原始内容JSON字符串格式
generated_content: 已生成的原始内容JSON字符串或字典对象
topic: 与内容相关的原始选题字典
Returns:
@ -55,18 +55,23 @@ class ContentJudger:
# 从原始内容中提取tags
original_tags = []
try:
original_content = json_repair_loads(generated_content)
if isinstance(original_content, dict) and "tags" in original_content:
original_tags = original_content.get("tags", [])
logger.info(f"从原始内容中提取到标签: {original_tags}")
except Exception as e:
logger.warning(f"从原始内容提取标签失败: {e}")
original_content = process_llm_json_text(generated_content)
if original_content and isinstance(original_content, dict) and "tags" in original_content:
original_tags = original_content.get("tags", [])
logger.info(f"从原始内容中提取到标签: {original_tags}")
else:
logger.warning("从原始内容提取标签失败")
# 将字典转换为JSON字符串以便在提示中使用
if isinstance(generated_content, dict):
generated_content_str = json.dumps(generated_content, ensure_ascii=False, indent=2)
else:
generated_content_str = str(generated_content)
# 1. 构建提示
system_prompt = self.prompt_builder.get_system_prompt()
user_prompt = self.prompt_builder.build_user_prompt(
generated_content=generated_content,
generated_content=generated_content_str,
topic=topic
)
@ -91,7 +96,7 @@ class ContentJudger:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=False,
use_stream=True,
stage="内容审核",
**model_params
)
@ -105,25 +110,21 @@ class ContentJudger:
return {"judge_success": False, "error": str(e)}
# 3. 解析结果
try:
judged_data = json_repair_loads(raw_result)
if isinstance(judged_data, dict) and "title" in judged_data and "content" in judged_data:
judged_data["judge_success"] = True
judged_data = process_llm_json_text(raw_result)
if judged_data and isinstance(judged_data, dict) and "title" in judged_data and "content" in judged_data:
judged_data["judge_success"] = True
# 直接使用原始内容中的标签
if original_tags:
judged_data["tags"] = original_tags
# 如果原始内容中没有标签,则使用默认标签
logger.info(f"内容审核成功完成,使用标签: {judged_data.get('tags', [])}")
# 保存审核后的内容
if self.output_manager:
self.output_manager.save_json(judged_data, f"{topic_dir}/article_judged.json")
# 直接使用原始内容中的标签
if original_tags:
judged_data["tags"] = original_tags
# 如果原始内容中没有标签,则使用默认标签
logger.info(f"内容审核成功完成,使用标签: {judged_data.get('tags', [])}")
# 保存审核后的内容
if self.output_manager:
self.output_manager.save_json(judged_data, f"{topic_dir}/article_judged.json")
return judged_data
else:
logger.warning(f"审核响应JSON格式不正确或缺少键: {judged_data}")
return {"judge_success": False, "error": "Invalid JSON response", "raw_response": raw_result}
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"解析审核响应JSON失败: {e}")
return {"judge_success": False, "error": "JSONDecodeError", "raw_response": raw_result}
return judged_data
else:
logger.warning(f"审核响应JSON格式不正确或缺少键")
return {"judge_success": False, "error": "Invalid JSON response", "raw_response": raw_result}

View File

@ -71,7 +71,7 @@ class TopicGenerator:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=False, # 选题生成通常不需要流式输出
use_stream=True, # 选题生成通常不需要流式输出
stage="选题生成",
**model_params
)

View File

@ -7,9 +7,8 @@ AI响应解析器模块
import logging
import json
import re
from typing import List, Dict, Any
from json_repair import loads as json_repair_loads
from utils.file_io import process_llm_json_text
logger = logging.getLogger(__name__)
@ -32,40 +31,28 @@ class TopicParser:
"""
logger.info("开始解析AI生成的选题...")
# 1. 移除AI思考过程的 <think> 块
if "</think>" in raw_text:
raw_text = raw_text.split("</think>", 1)[-1]
# 使用通用JSON解析函数解析原始文本
parsed_json = process_llm_json_text(raw_text)
# 2. 移除Markdown代码块标记 (e.g., ```json ... ```)
cleaned_text = re.sub(r'```json\s*|\s*```', '', raw_text.strip(), flags=re.MULTILINE)
if not cleaned_text:
logger.error("移除元数据后,解析内容为空")
if not parsed_json:
logger.error("解析AI响应失败无法获取JSON数据")
return []
try:
# 3. 使用json_repair修复可能不规范的JSON并解析
parsed_json = json_repair_loads(cleaned_text)
if not isinstance(parsed_json, list):
logger.error(f"解析结果不是列表,而是 {type(parsed_json)}")
return []
logger.info(f"成功解析 {len(parsed_json)} 个选题对象。开始验证...")
# 4. 验证每个选题是否包含所有必需的键
valid_topics = []
required_keys = {"index", "date", "logic", "object", "product", "style", "target_audience"}
for i, item in enumerate(parsed_json):
if isinstance(item, dict) and required_keys.issubset(item.keys()):
valid_topics.append(item)
else:
logger.warning(f"{i+1} 个选题缺少必需键或格式不正确: {item}")
logger.info(f"验证完成,获得 {len(valid_topics)} 个有效选题。")
return valid_topics
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"解析JSON失败: {e}", exc_info=True)
return []
if not isinstance(parsed_json, list):
logger.error(f"解析结果不是列表,而是 {type(parsed_json)}")
return []
logger.info(f"成功解析 {len(parsed_json)} 个选题对象。开始验证...")
# 验证每个选题是否包含所有必需的键
valid_topics = []
required_keys = {"index", "date", "logic", "object", "product", "style", "target_audience"}
for i, item in enumerate(parsed_json):
if isinstance(item, dict) and required_keys.issubset(item.keys()):
valid_topics.append(item)
else:
logger.warning(f"{i+1} 个选题缺少必需键或格式不正确: {item}")
logger.info(f"验证完成,获得 {len(valid_topics)} 个有效选题。")
return valid_topics