Compare commits
No commits in common. "66bfad2f3c96babaffffa594eb6aa2fbf48a48cd" and "740abd06a1f0b1f599a0895d7c82fa22e429c22a" have entirely different histories.
66bfad2f3c
...
740abd06a1
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -49,40 +49,15 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
|
||||
try:
|
||||
# 创建临时文件处理base64文档
|
||||
if request.documents:
|
||||
temp_files = []
|
||||
for doc in request.documents:
|
||||
try:
|
||||
# 从base64内容中提取实际内容(跳过data:image/jpeg;base64,这样的前缀)
|
||||
content = doc.content
|
||||
if ',' in content:
|
||||
content = content.split(',', 1)[1]
|
||||
|
||||
# 创建临时文件
|
||||
suffix = os.path.splitext(doc.filename)[1]
|
||||
if not suffix:
|
||||
# 根据MIME类型推断后缀
|
||||
mime_to_ext = {
|
||||
'text/plain': '.txt',
|
||||
'application/pdf': '.pdf',
|
||||
'application/msword': '.doc',
|
||||
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
|
||||
'image/jpeg': '.jpg',
|
||||
'image/png': '.png'
|
||||
}
|
||||
suffix = mime_to_ext.get(doc.mime_type, '.bin')
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(doc.filename)[1]) as temp_file:
|
||||
# 解码base64内容并写入临时文件
|
||||
try:
|
||||
decoded_content = base64.b64decode(content)
|
||||
temp_file.write(decoded_content)
|
||||
content = base64.b64decode(doc.content)
|
||||
temp_file.write(content)
|
||||
temp_files.append(temp_file.name)
|
||||
logger.info(f"成功保存临时文件: {temp_file.name}")
|
||||
except Exception as e:
|
||||
logger.error(f"Base64解码失败: {e}")
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"文档 {doc.filename} 的Base64内容无效: {str(e)}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"处理文档 {doc.filename} 失败: {e}")
|
||||
raise HTTPException(
|
||||
@ -95,8 +70,8 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
|
||||
# 调用服务层处理
|
||||
result = await integration_service.integrate_content(
|
||||
document_paths=temp_files,
|
||||
keywords=request.keywords or [],
|
||||
cookies=request.cookies or "",
|
||||
keywords=request.keywords,
|
||||
cookies=request.cookies,
|
||||
sort_type=request.sort_type,
|
||||
note_type=request.note_type,
|
||||
note_time=request.note_time,
|
||||
@ -105,7 +80,36 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
|
||||
query_num=request.query_num
|
||||
)
|
||||
|
||||
return result
|
||||
# 转换为响应模型
|
||||
if result["success"]:
|
||||
response = ContentIntegrationResponse(
|
||||
success=True,
|
||||
timestamp=result["timestamp"],
|
||||
processing_time=result["processing_time"],
|
||||
input_summary=result["input_summary"],
|
||||
document_info=result["document_info"],
|
||||
xhs_info=result["xhs_info"],
|
||||
integrated_content=result["integrated_content"],
|
||||
search_config=result["search_config"],
|
||||
error_message=None # 成功时无错误信息
|
||||
)
|
||||
logger.info(f"内容整合成功,处理时间:{result['processing_time']}")
|
||||
else:
|
||||
from datetime import datetime
|
||||
response = ContentIntegrationResponse(
|
||||
success=False,
|
||||
timestamp=result.get("timestamp", datetime.now().strftime("%Y%m%d_%H%M%S")),
|
||||
processing_time=result.get("processing_time", "0秒"),
|
||||
input_summary=result.get("input_summary"),
|
||||
document_info=result.get("document_info"),
|
||||
xhs_info=result.get("xhs_info"),
|
||||
integrated_content=result.get("integrated_content"),
|
||||
search_config=result.get("search_config"),
|
||||
error_message=result.get("error_message")
|
||||
)
|
||||
logger.error(f"内容整合失败:{result['error_message']}")
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"内容整合接口异常:{e}", exc_info=True)
|
||||
|
||||
@ -125,96 +125,38 @@ def _add_ids_to_topics(topics: List[Dict[str, Any]], id_name_mappings: Dict[str,
|
||||
Returns:
|
||||
包含ID字段的选题列表
|
||||
"""
|
||||
def find_best_match(target_name: str, mapping: Dict[str, int]) -> Optional[int]:
|
||||
"""
|
||||
寻找最佳匹配的ID,支持模糊匹配
|
||||
"""
|
||||
if not target_name or not mapping:
|
||||
return None
|
||||
|
||||
# 1. 精确匹配
|
||||
if target_name in mapping:
|
||||
return mapping[target_name]
|
||||
|
||||
# 2. 模糊匹配 - 去除空格后匹配
|
||||
target_clean = target_name.replace(" ", "").strip()
|
||||
for name, id_val in mapping.items():
|
||||
if name.replace(" ", "").strip() == target_clean:
|
||||
logger.info(f"模糊匹配成功: '{target_name}' -> '{name}' (ID: {id_val})")
|
||||
return id_val
|
||||
|
||||
# 3. 包含匹配 - 检查是否互相包含
|
||||
for name, id_val in mapping.items():
|
||||
if target_clean in name.replace(" ", "") or name.replace(" ", "") in target_clean:
|
||||
logger.info(f"包含匹配成功: '{target_name}' -> '{name}' (ID: {id_val})")
|
||||
return id_val
|
||||
|
||||
# 4. 未找到匹配
|
||||
logger.warning(f"未找到匹配的ID: '{target_name}', 可用选项: {list(mapping.keys())}")
|
||||
return None
|
||||
|
||||
enriched_topics = []
|
||||
|
||||
for topic in topics:
|
||||
# 复制原topic
|
||||
enriched_topic = topic.copy()
|
||||
|
||||
# 初始化ID字段
|
||||
# 添加ID字段
|
||||
enriched_topic['styleIds'] = []
|
||||
enriched_topic['audienceIds'] = []
|
||||
enriched_topic['scenicSpotIds'] = []
|
||||
enriched_topic['productIds'] = []
|
||||
|
||||
# 记录匹配结果
|
||||
match_results = {
|
||||
'style_matched': False,
|
||||
'audience_matched': False,
|
||||
'scenic_spot_matched': False,
|
||||
'product_matched': False
|
||||
}
|
||||
|
||||
# 根据topic中的name查找对应的ID
|
||||
if 'style' in topic and topic['style']:
|
||||
style_id = find_best_match(topic['style'], id_name_mappings['style_mapping'])
|
||||
if style_id:
|
||||
enriched_topic['styleIds'] = [style_id]
|
||||
match_results['style_matched'] = True
|
||||
style_name = topic['style']
|
||||
if style_name in id_name_mappings['style_mapping']:
|
||||
enriched_topic['styleIds'] = [id_name_mappings['style_mapping'][style_name]]
|
||||
|
||||
if 'targetAudience' in topic and topic['targetAudience']:
|
||||
audience_id = find_best_match(topic['targetAudience'], id_name_mappings['audience_mapping'])
|
||||
if audience_id:
|
||||
enriched_topic['audienceIds'] = [audience_id]
|
||||
match_results['audience_matched'] = True
|
||||
audience_name = topic['targetAudience']
|
||||
if audience_name in id_name_mappings['audience_mapping']:
|
||||
enriched_topic['audienceIds'] = [id_name_mappings['audience_mapping'][audience_name]]
|
||||
|
||||
if 'object' in topic and topic['object']:
|
||||
spot_id = find_best_match(topic['object'], id_name_mappings['scenic_spot_mapping'])
|
||||
if spot_id:
|
||||
enriched_topic['scenicSpotIds'] = [spot_id]
|
||||
match_results['scenic_spot_matched'] = True
|
||||
spot_name = topic['object']
|
||||
if spot_name in id_name_mappings['scenic_spot_mapping']:
|
||||
enriched_topic['scenicSpotIds'] = [id_name_mappings['scenic_spot_mapping'][spot_name]]
|
||||
|
||||
if 'product' in topic and topic['product']:
|
||||
product_id = find_best_match(topic['product'], id_name_mappings['product_mapping'])
|
||||
if product_id:
|
||||
enriched_topic['productIds'] = [product_id]
|
||||
match_results['product_matched'] = True
|
||||
|
||||
# 记录匹配情况
|
||||
total_fields = sum(1 for key in ['style', 'targetAudience', 'object', 'product'] if key in topic and topic[key])
|
||||
matched_fields = sum(match_results.values())
|
||||
|
||||
if total_fields > 0:
|
||||
match_rate = matched_fields / total_fields * 100
|
||||
logger.info(f"选题 {topic.get('index', 'N/A')} ID匹配率: {match_rate:.1f}% ({matched_fields}/{total_fields})")
|
||||
|
||||
# 如果匹配率低于50%,记录警告
|
||||
if match_rate < 50:
|
||||
logger.warning(f"选题 {topic.get('index', 'N/A')} ID匹配率较低: {match_rate:.1f}%")
|
||||
|
||||
# 添加匹配元数据
|
||||
# enriched_topic['_id_match_metadata'] = {
|
||||
# 'match_results': match_results,
|
||||
# 'match_rate': matched_fields / max(total_fields, 1) * 100 if total_fields > 0 else 0
|
||||
# }
|
||||
product_name = topic['product']
|
||||
if product_name in id_name_mappings['product_mapping']:
|
||||
enriched_topic['productIds'] = [id_name_mappings['product_mapping'][product_name]]
|
||||
|
||||
enriched_topics.append(enriched_topic)
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -78,8 +78,7 @@ class ContentIntegrationService:
|
||||
整合结果字典
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords) if keywords else 0}")
|
||||
logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords)}")
|
||||
|
||||
try:
|
||||
# 确保输出目录存在
|
||||
|
||||
@ -55,23 +55,10 @@ class DatabaseService:
|
||||
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
|
||||
|
||||
# 创建连接池
|
||||
# 从配置中分离MySQL连接池支持的参数和不支持的参数
|
||||
config = attempt["config"].copy()
|
||||
|
||||
# MySQL连接池不支持的参数,需要移除
|
||||
unsupported_params = [
|
||||
'max_retry_attempts', 'query_timeout', 'soft_delete_field', 'active_record_value'
|
||||
]
|
||||
for param in unsupported_params:
|
||||
config.pop(param, None)
|
||||
|
||||
# 设置连接池参数,使用配置文件中的值或默认值
|
||||
pool_size = config.pop('pool_size', 10)
|
||||
|
||||
pool = pooling.MySQLConnectionPool(
|
||||
pool_name=f"database_service_pool_{int(time.time())}",
|
||||
pool_size=pool_size,
|
||||
**config
|
||||
pool_size=10,
|
||||
**attempt["config"]
|
||||
)
|
||||
|
||||
# 测试连接
|
||||
@ -223,8 +210,7 @@ class DatabaseService:
|
||||
景区信息列表
|
||||
"""
|
||||
if not self.db_pool or not spot_ids:
|
||||
logger.warning("数据库连接池未初始化或景区ID列表为空")
|
||||
return self._get_fallback_scenic_spots(spot_ids)
|
||||
return []
|
||||
|
||||
try:
|
||||
with self.db_pool.get_connection() as conn:
|
||||
@ -234,42 +220,12 @@ class DatabaseService:
|
||||
query = f"SELECT * FROM scenicSpot WHERE id IN ({placeholders}) AND isDelete = 0"
|
||||
cursor.execute(query, spot_ids)
|
||||
results = cursor.fetchall()
|
||||
|
||||
# 检查是否所有ID都找到了对应的记录
|
||||
found_ids = {result['id'] for result in results}
|
||||
missing_ids = set(spot_ids) - found_ids
|
||||
|
||||
if missing_ids:
|
||||
logger.warning(f"部分景区ID未找到: {missing_ids}")
|
||||
# 添加兜底数据
|
||||
fallback_spots = self._get_fallback_scenic_spots(list(missing_ids))
|
||||
results.extend(fallback_spots)
|
||||
|
||||
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len([r for r in results if r['id'] in spot_ids])}个")
|
||||
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len(results)}个")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量查询景区信息失败: {e}")
|
||||
return self._get_fallback_scenic_spots(spot_ids)
|
||||
|
||||
def _get_fallback_scenic_spots(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取景区的兜底数据
|
||||
"""
|
||||
fallback_spots = []
|
||||
for spot_id in spot_ids:
|
||||
fallback_spots.append({
|
||||
'id': spot_id,
|
||||
'name': f'景区{spot_id}',
|
||||
'address': '默认地址',
|
||||
'advantage': '优美的自然风光',
|
||||
'highlight': '值得一游的景点',
|
||||
'isPublic': 1,
|
||||
'isDelete': 0,
|
||||
'_is_fallback': True
|
||||
})
|
||||
logger.info(f"使用兜底景区数据: {len(fallback_spots)}个")
|
||||
return fallback_spots
|
||||
return []
|
||||
|
||||
def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
@ -282,8 +238,7 @@ class DatabaseService:
|
||||
产品信息列表
|
||||
"""
|
||||
if not self.db_pool or not productIds:
|
||||
logger.warning("数据库连接池未初始化或产品ID列表为空")
|
||||
return self._get_fallback_products(productIds)
|
||||
return []
|
||||
|
||||
try:
|
||||
with self.db_pool.get_connection() as conn:
|
||||
@ -293,42 +248,12 @@ class DatabaseService:
|
||||
query = f"SELECT * FROM product WHERE id IN ({placeholders}) AND isDelete = 0"
|
||||
cursor.execute(query, productIds)
|
||||
results = cursor.fetchall()
|
||||
|
||||
# 检查是否所有ID都找到了对应的记录
|
||||
found_ids = {result['id'] for result in results}
|
||||
missing_ids = set(productIds) - found_ids
|
||||
|
||||
if missing_ids:
|
||||
logger.warning(f"部分产品ID未找到: {missing_ids}")
|
||||
# 添加兜底数据
|
||||
fallback_products = self._get_fallback_products(list(missing_ids))
|
||||
results.extend(fallback_products)
|
||||
|
||||
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len([r for r in results if r['id'] in productIds])}个")
|
||||
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len(results)}个")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量查询产品信息失败: {e}")
|
||||
return self._get_fallback_products(productIds)
|
||||
|
||||
def _get_fallback_products(self, productIds: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取产品的兜底数据
|
||||
"""
|
||||
fallback_products = []
|
||||
for product_id in productIds:
|
||||
fallback_products.append({
|
||||
'id': product_id,
|
||||
'productName': f'产品{product_id}',
|
||||
'originPrice': 999,
|
||||
'realPrice': 888,
|
||||
'packageInfo': '包含景区门票和导游服务',
|
||||
'advantage': '性价比高,服务优质',
|
||||
'isDelete': 0,
|
||||
'_is_fallback': True
|
||||
})
|
||||
logger.info(f"使用兜底产品数据: {len(fallback_products)}个")
|
||||
return fallback_products
|
||||
return []
|
||||
|
||||
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
@ -341,8 +266,7 @@ class DatabaseService:
|
||||
风格信息列表
|
||||
"""
|
||||
if not self.db_pool or not styleIds:
|
||||
logger.warning("数据库连接池未初始化或风格ID列表为空")
|
||||
return self._get_fallback_styles(styleIds)
|
||||
return []
|
||||
|
||||
try:
|
||||
with self.db_pool.get_connection() as conn:
|
||||
@ -352,39 +276,12 @@ class DatabaseService:
|
||||
query = f"SELECT * FROM contentStyle WHERE id IN ({placeholders}) AND isDelete = 0"
|
||||
cursor.execute(query, styleIds)
|
||||
results = cursor.fetchall()
|
||||
|
||||
# 检查是否所有ID都找到了对应的记录
|
||||
found_ids = {result['id'] for result in results}
|
||||
missing_ids = set(styleIds) - found_ids
|
||||
|
||||
if missing_ids:
|
||||
logger.warning(f"部分风格ID未找到: {missing_ids}")
|
||||
# 添加兜底数据
|
||||
fallback_styles = self._get_fallback_styles(list(missing_ids))
|
||||
results.extend(fallback_styles)
|
||||
|
||||
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len([r for r in results if r['id'] in styleIds])}个")
|
||||
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len(results)}个")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量查询风格信息失败: {e}")
|
||||
return self._get_fallback_styles(styleIds)
|
||||
|
||||
def _get_fallback_styles(self, styleIds: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取风格的兜底数据
|
||||
"""
|
||||
fallback_styles = []
|
||||
for style_id in styleIds:
|
||||
fallback_styles.append({
|
||||
'id': style_id,
|
||||
'styleName': f'风格{style_id}',
|
||||
'description': '默认风格描述',
|
||||
'isDelete': 0,
|
||||
'_is_fallback': True
|
||||
})
|
||||
logger.info(f"使用兜底风格数据: {len(fallback_styles)}个")
|
||||
return fallback_styles
|
||||
return []
|
||||
|
||||
def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
@ -397,8 +294,7 @@ class DatabaseService:
|
||||
受众信息列表
|
||||
"""
|
||||
if not self.db_pool or not audienceIds:
|
||||
logger.warning("数据库连接池未初始化或受众ID列表为空")
|
||||
return self._get_fallback_audiences(audienceIds)
|
||||
return []
|
||||
|
||||
try:
|
||||
with self.db_pool.get_connection() as conn:
|
||||
@ -408,39 +304,12 @@ class DatabaseService:
|
||||
query = f"SELECT * FROM targetAudience WHERE id IN ({placeholders}) AND isDelete = 0"
|
||||
cursor.execute(query, audienceIds)
|
||||
results = cursor.fetchall()
|
||||
|
||||
# 检查是否所有ID都找到了对应的记录
|
||||
found_ids = {result['id'] for result in results}
|
||||
missing_ids = set(audienceIds) - found_ids
|
||||
|
||||
if missing_ids:
|
||||
logger.warning(f"部分受众ID未找到: {missing_ids}")
|
||||
# 添加兜底数据
|
||||
fallback_audiences = self._get_fallback_audiences(list(missing_ids))
|
||||
results.extend(fallback_audiences)
|
||||
|
||||
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len([r for r in results if r['id'] in audienceIds])}个")
|
||||
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len(results)}个")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量查询受众信息失败: {e}")
|
||||
return self._get_fallback_audiences(audienceIds)
|
||||
|
||||
def _get_fallback_audiences(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取受众的兜底数据
|
||||
"""
|
||||
fallback_audiences = []
|
||||
for audience_id in audienceIds:
|
||||
fallback_audiences.append({
|
||||
'id': audience_id,
|
||||
'audienceName': f'受众{audience_id}',
|
||||
'description': '默认受众描述',
|
||||
'isDelete': 0,
|
||||
'_is_fallback': True
|
||||
})
|
||||
logger.info(f"使用兜底受众数据: {len(fallback_audiences)}个")
|
||||
return fallback_audiences
|
||||
return []
|
||||
|
||||
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
|
||||
@ -114,23 +114,10 @@ class PromptService:
|
||||
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
|
||||
|
||||
# 创建连接池
|
||||
# 从配置中分离MySQL连接池支持的参数和不支持的参数
|
||||
config = attempt["config"].copy()
|
||||
|
||||
# MySQL连接池不支持的参数,需要移除
|
||||
unsupported_params = [
|
||||
'max_retry_attempts', 'query_timeout', 'soft_delete_field', 'active_record_value'
|
||||
]
|
||||
for param in unsupported_params:
|
||||
config.pop(param, None)
|
||||
|
||||
# 设置连接池参数,使用配置文件中的值或默认值
|
||||
pool_size = config.pop('pool_size', 5)
|
||||
|
||||
pool = pooling.MySQLConnectionPool(
|
||||
pool_name=f"prompt_service_pool_{int(time.time())}",
|
||||
pool_size=pool_size,
|
||||
**config
|
||||
pool_size=5,
|
||||
**attempt["config"]
|
||||
)
|
||||
|
||||
# 测试连接
|
||||
|
||||
@ -19,7 +19,6 @@ from tweet.content_generator import ContentGenerator
|
||||
from tweet.content_judger import ContentJudger
|
||||
from api.services.prompt_builder import PromptBuilderService
|
||||
from api.services.prompt_service import PromptService
|
||||
from api.services.database_service import DatabaseService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -110,21 +109,22 @@ class TweetService:
|
||||
logger.info(f"选题生成完成,请求ID: {requestId}, 数量: {len(topics)}")
|
||||
return requestId, topics
|
||||
|
||||
async def generate_content(self, topic: Optional[Dict[str, Any]] = None, autoJudge: bool = False,
|
||||
async def generate_content(self, topic: Optional[Dict[str, Any]] = None,
|
||||
style_objects: Optional[List[Dict[str, Any]]] = None,
|
||||
audience_objects: Optional[List[Dict[str, Any]]] = None,
|
||||
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
|
||||
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, str, Dict[str, Any]]:
|
||||
product_objects: Optional[List[Dict[str, Any]]] = None,
|
||||
autoJudge: bool = False) -> Tuple[str, str, Dict[str, Any]]:
|
||||
"""
|
||||
为单个选题生成内容
|
||||
为选题生成内容
|
||||
|
||||
Args:
|
||||
topic: 选题信息(可能包含ID字段)
|
||||
autoJudge: 是否进行内嵌审核
|
||||
style_objects: 风格对象列表(可选,用于兼容)
|
||||
audience_objects: 受众对象列表(可选,用于兼容)
|
||||
scenic_spot_objects: 景区对象列表(可选,用于兼容)
|
||||
product_objects: 产品对象列表(可选,用于兼容)
|
||||
topic: 选题信息
|
||||
styles: 风格列表
|
||||
audiences: 受众列表
|
||||
scenic_spots: 景区列表
|
||||
products: 产品列表
|
||||
autoJudge: 是否自动进行内容审核
|
||||
|
||||
Returns:
|
||||
请求ID、选题索引和生成的内容(包含judgeSuccess状态)
|
||||
@ -135,24 +135,22 @@ class TweetService:
|
||||
topicIndex = topic.get('index', 'N/A')
|
||||
logger.info(f"开始为选题 {topicIndex} 生成内容{'(含审核)' if autoJudge else ''}")
|
||||
|
||||
# 增强版的topic处理:优先使用ID获取最新数据
|
||||
enhanced_topic = await self._enhance_topic_with_database_data(topic)
|
||||
|
||||
# 如果没有通过ID获取到数据,使用传入的对象参数作为兜底
|
||||
if style_objects and not enhanced_topic.get('style_object'):
|
||||
# 核心修改:创建一个增强版的topic,将所有需要的信息预先填充好
|
||||
enhanced_topic = topic.copy()
|
||||
if style_objects:
|
||||
enhanced_topic['style_object'] = style_objects[0]
|
||||
enhanced_topic['style'] = style_objects[0].get('styleName')
|
||||
if audience_objects and not enhanced_topic.get('audience_object'):
|
||||
if audience_objects:
|
||||
enhanced_topic['audience_object'] = audience_objects[0]
|
||||
enhanced_topic['targetAudience'] = audience_objects[0].get('audienceName')
|
||||
if scenic_spot_objects and not enhanced_topic.get('scenic_spot_object'):
|
||||
if scenic_spot_objects:
|
||||
enhanced_topic['scenic_spot_object'] = scenic_spot_objects[0]
|
||||
enhanced_topic['object'] = scenic_spot_objects[0].get('name')
|
||||
if product_objects and not enhanced_topic.get('product_object'):
|
||||
if product_objects:
|
||||
enhanced_topic['product_object'] = product_objects[0]
|
||||
enhanced_topic['product'] = product_objects[0].get('productName')
|
||||
|
||||
# 使用PromptBuilderService构建提示词
|
||||
# 使用PromptBuilderService构建提示词,现在它只需要enhanced_topic
|
||||
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(enhanced_topic, "content")
|
||||
|
||||
# 使用预构建的提示词生成内容
|
||||
@ -181,82 +179,24 @@ class TweetService:
|
||||
content = {k: v for k, v in judged_content.items() if k != 'judge_success'}
|
||||
content['judgeSuccess'] = True
|
||||
else:
|
||||
logger.warning(f"选题 {topicIndex} 内容审核未通过")
|
||||
# 审核失败:使用原始内容,添加judgeSuccess状态
|
||||
logger.warning(f"选题 {topicIndex} 内容审核失败,保持原始内容")
|
||||
# 审核失败:保持原始内容,添加judgeSuccess=False标记
|
||||
content['judgeSuccess'] = False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"选题 {topicIndex} 内容审核过程中发生错误: {e}", exc_info=True)
|
||||
# 审核出错:使用原始内容,标记审核失败
|
||||
logger.error(f"选题 {topicIndex} 内嵌审核失败: {e},保持原始内容")
|
||||
# 审核异常:保持原始内容,添加judgeSuccess=False标记
|
||||
content['judgeSuccess'] = False
|
||||
else:
|
||||
# 未启用审核:添加judgeSuccess=None标记,表示未进行审核
|
||||
content['judgeSuccess'] = None
|
||||
|
||||
# 生成请求ID
|
||||
requestId = f"content-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
|
||||
|
||||
logger.info(f"选题 {topicIndex} 内容生成完成,请求ID: {requestId}")
|
||||
logger.info(f"内容生成完成,请求ID: {requestId}, 选题索引: {topicIndex}")
|
||||
return requestId, topicIndex, content
|
||||
|
||||
async def _enhance_topic_with_database_data(self, topic: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
使用数据库数据增强选题信息
|
||||
|
||||
Args:
|
||||
topic: 原始选题数据
|
||||
|
||||
Returns:
|
||||
增强后的选题数据
|
||||
"""
|
||||
enhanced_topic = topic.copy()
|
||||
|
||||
try:
|
||||
# 通过数据库服务获取详细信息
|
||||
db_service = DatabaseService(self.config_manager)
|
||||
|
||||
if not db_service.is_available():
|
||||
logger.warning("数据库服务不可用,无法增强选题数据")
|
||||
return enhanced_topic
|
||||
|
||||
# 处理风格ID
|
||||
if 'styleIds' in topic and topic['styleIds']:
|
||||
style_id = topic['styleIds'][0] if isinstance(topic['styleIds'], list) else topic['styleIds']
|
||||
style_data = db_service.get_style_by_id(style_id)
|
||||
if style_data:
|
||||
enhanced_topic['style_object'] = style_data
|
||||
enhanced_topic['style'] = style_data.get('styleName')
|
||||
logger.info(f"从数据库加载风格数据: {style_data.get('styleName')} (ID: {style_id})")
|
||||
|
||||
# 处理受众ID
|
||||
if 'audienceIds' in topic and topic['audienceIds']:
|
||||
audience_id = topic['audienceIds'][0] if isinstance(topic['audienceIds'], list) else topic['audienceIds']
|
||||
audience_data = db_service.get_audience_by_id(audience_id)
|
||||
if audience_data:
|
||||
enhanced_topic['audience_object'] = audience_data
|
||||
enhanced_topic['targetAudience'] = audience_data.get('audienceName')
|
||||
logger.info(f"从数据库加载受众数据: {audience_data.get('audienceName')} (ID: {audience_id})")
|
||||
|
||||
# 处理景区ID
|
||||
if 'scenicSpotIds' in topic and topic['scenicSpotIds']:
|
||||
spot_id = topic['scenicSpotIds'][0] if isinstance(topic['scenicSpotIds'], list) else topic['scenicSpotIds']
|
||||
spot_data = db_service.get_scenic_spot_by_id(spot_id)
|
||||
if spot_data:
|
||||
enhanced_topic['scenic_spot_object'] = spot_data
|
||||
enhanced_topic['object'] = spot_data.get('name')
|
||||
logger.info(f"从数据库加载景区数据: {spot_data.get('name')} (ID: {spot_id})")
|
||||
|
||||
# 处理产品ID
|
||||
if 'productIds' in topic and topic['productIds']:
|
||||
product_id = topic['productIds'][0] if isinstance(topic['productIds'], list) else topic['productIds']
|
||||
product_data = db_service.get_product_by_id(product_id)
|
||||
if product_data:
|
||||
enhanced_topic['product_object'] = product_data
|
||||
enhanced_topic['product'] = product_data.get('productName')
|
||||
logger.info(f"从数据库加载产品数据: {product_data.get('productName')} (ID: {product_id})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"增强选题数据时发生错误: {e}", exc_info=True)
|
||||
|
||||
return enhanced_topic
|
||||
|
||||
async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Tuple[str, str, Dict[str, Any]]:
|
||||
"""
|
||||
使用预构建的提示词为选题生成内容
|
||||
@ -363,7 +303,6 @@ class TweetService:
|
||||
|
||||
for topic in topics:
|
||||
topicIndex = topic.get('index', 'unknown')
|
||||
# 直接传递带有ID的选题数据,不再需要传递额外的对象参数
|
||||
_, _, content = await self.generate_content(topic, autoJudge=autoJudge)
|
||||
|
||||
if autoJudge:
|
||||
|
||||
@ -4,10 +4,5 @@
|
||||
"password": "Kj#9mP2$",
|
||||
"database": "travel_content",
|
||||
"port": 3306,
|
||||
"charset": "utf8mb4",
|
||||
"pool_size": 10,
|
||||
"max_retry_attempts": 3,
|
||||
"query_timeout": 30,
|
||||
"soft_delete_field": "isDelete",
|
||||
"active_record_value": 0
|
||||
"charset": "utf8mb4"
|
||||
}
|
||||
@ -1,8 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Tweet模块 - 负责文字内容的生成、审核和管理
|
||||
"""
|
||||
|
||||
__version__ = '1.0.0'
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,142 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
内容生成模块
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import Dict, Any, Tuple, Optional
|
||||
|
||||
from core.ai import AIAgent
|
||||
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
|
||||
from utils.prompts import ContentPromptBuilder
|
||||
from utils.file_io import OutputManager, process_llm_json_text
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContentGenerator:
|
||||
"""负责为单个选题生成内容"""
|
||||
|
||||
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
|
||||
self.ai_agent = ai_agent
|
||||
self.config_manager = config_manager
|
||||
self.topic_config = config_manager.get_config('topic_gen', GenerateTopicConfig)
|
||||
self.content_config = config_manager.get_config('content_gen', GenerateContentConfig)
|
||||
self.output_manager = output_manager
|
||||
self.prompt_builder = ContentPromptBuilder(config_manager)
|
||||
|
||||
async def generate_content_for_topic(self, topic: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
为单个选题生成内容
|
||||
|
||||
Args:
|
||||
topic: 选题信息字典
|
||||
|
||||
Returns:
|
||||
包含生成内容的字典
|
||||
"""
|
||||
topic_index = topic.get('index', 'N/A')
|
||||
logger.info(f"开始为选题 {topic_index} 生成内容...")
|
||||
|
||||
# 1. 构建提示
|
||||
# 使用模板构建器分别获取系统和用户提示
|
||||
system_prompt = self.prompt_builder.get_system_prompt()
|
||||
user_prompt = self.prompt_builder.build_user_prompt(topic=topic)
|
||||
|
||||
# 保存提示以供调试
|
||||
output_dir = self.output_manager.get_topic_dir(topic_index)
|
||||
self.output_manager.save_text(system_prompt, "content_system_prompt.txt", subdir=output_dir.name)
|
||||
self.output_manager.save_text(user_prompt, "content_user_prompt.txt", subdir=output_dir.name)
|
||||
|
||||
# 获取模型参数
|
||||
model_params = {}
|
||||
if hasattr(self.content_config, 'model') and isinstance(self.content_config.model, dict):
|
||||
model_params = {
|
||||
'temperature': self.content_config.model.get('temperature'),
|
||||
'top_p': self.content_config.model.get('top_p'),
|
||||
'presence_penalty': self.content_config.model.get('presence_penalty')
|
||||
}
|
||||
# 移除None值
|
||||
model_params = {k: v for k, v in model_params.items() if v is not None}
|
||||
|
||||
# 2. 调用AI
|
||||
try:
|
||||
raw_result, _, _, _ = await self.ai_agent.generate_text(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
use_stream=True,
|
||||
stage="内容生成",
|
||||
**model_params
|
||||
)
|
||||
self.output_manager.save_text(raw_result, "content_raw_response.txt", subdir=output_dir.name)
|
||||
except Exception as e:
|
||||
logger.critical(f"为选题 {topic_index} 生成内容时AI调用失败: {e}", exc_info=True)
|
||||
return {"error": str(e)}
|
||||
|
||||
# 3. 解析和保存结果
|
||||
content_data = process_llm_json_text(raw_result)
|
||||
if content_data:
|
||||
self.output_manager.save_json(content_data, "article.json", subdir=output_dir.name)
|
||||
logger.info(f"成功为选题 {topic_index} 生成并保存内容。")
|
||||
return content_data
|
||||
else:
|
||||
logger.error(f"解析内容JSON失败 for {topic_index}")
|
||||
return {"error": "JSONDecodeError", "raw_content": raw_result}
|
||||
|
||||
async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Dict[str, Any]:
|
||||
"""
|
||||
使用已构建的提示词生成内容
|
||||
|
||||
Args:
|
||||
topic: 选题信息字典
|
||||
system_prompt: 已构建好的系统提示词
|
||||
user_prompt: 已构建好的用户提示词
|
||||
|
||||
Returns:
|
||||
包含生成内容的字典
|
||||
"""
|
||||
topic_index = topic.get('index', 'N/A')
|
||||
logger.info(f"使用预构建提示词为选题 {topic_index} 生成内容...")
|
||||
|
||||
# 保存提示以供调试
|
||||
output_dir = self.output_manager.get_topic_dir(topic_index)
|
||||
self.output_manager.save_text(system_prompt, "content_system_prompt.txt", subdir=output_dir.name)
|
||||
self.output_manager.save_text(user_prompt, "content_user_prompt.txt", subdir=output_dir.name)
|
||||
|
||||
# 获取模型参数
|
||||
model_params = {}
|
||||
if hasattr(self.content_config, 'model') and isinstance(self.content_config.model, dict):
|
||||
model_params = {
|
||||
'temperature': self.content_config.model.get('temperature'),
|
||||
'top_p': self.content_config.model.get('top_p'),
|
||||
'presence_penalty': self.content_config.model.get('presence_penalty')
|
||||
}
|
||||
# 移除None值
|
||||
model_params = {k: v for k, v in model_params.items() if v is not None}
|
||||
|
||||
# 调用AI
|
||||
try:
|
||||
raw_result, _, _, _ = await self.ai_agent.generate_text(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
use_stream=True,
|
||||
stage="内容生成",
|
||||
**model_params
|
||||
)
|
||||
self.output_manager.save_text(raw_result, "content_raw_response.txt", subdir=output_dir.name)
|
||||
except Exception as e:
|
||||
logger.critical(f"为选题 {topic_index} 生成内容时AI调用失败: {e}", exc_info=True)
|
||||
return {"error": str(e)}
|
||||
|
||||
# 解析和保存结果
|
||||
content_data = process_llm_json_text(raw_result)
|
||||
if content_data:
|
||||
self.output_manager.save_json(content_data, "article.json", subdir=output_dir.name)
|
||||
logger.info(f"成功为选题 {topic_index} 生成并保存内容。")
|
||||
return content_data
|
||||
else:
|
||||
logger.error(f"解析内容JSON失败 for {topic_index}")
|
||||
return {"error": "JSONDecodeError", "raw_content": raw_result}
|
||||
@ -1,212 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
内容审核模块
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import Dict, Any, Union
|
||||
|
||||
from core.ai import AIAgent
|
||||
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
|
||||
from utils.prompts import JudgerPromptBuilder
|
||||
from utils.file_io import process_llm_json_text
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContentJudger:
|
||||
"""内容审核类,使用AI评估和修正内容"""
|
||||
|
||||
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager=None):
|
||||
"""
|
||||
初始化内容审核器
|
||||
|
||||
Args:
|
||||
ai_agent: AIAgent实例
|
||||
config_manager: 配置管理器
|
||||
output_manager: 输出管理器,用于保存提示词和响应
|
||||
"""
|
||||
self.ai_agent = ai_agent
|
||||
self.config_manager = config_manager
|
||||
self.topic_config = config_manager.get_config('topic_gen', GenerateTopicConfig)
|
||||
self.content_config = config_manager.get_config('content_gen', GenerateContentConfig)
|
||||
self.prompt_builder = JudgerPromptBuilder(config_manager)
|
||||
self.output_manager = output_manager
|
||||
|
||||
async def judge_content(self, generated_content: Union[str, Dict[str, Any]], topic: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
调用AI审核生成的内容
|
||||
|
||||
Args:
|
||||
generated_content: 已生成的原始内容(JSON字符串或字典对象)
|
||||
topic: 与内容相关的原始选题字典
|
||||
|
||||
Returns:
|
||||
一个包含审核结果的字典
|
||||
"""
|
||||
logger.info("开始审核生成的内容...")
|
||||
|
||||
# 获取主题索引,用于保存文件
|
||||
topic_index = topic.get('index', 'unknown')
|
||||
topic_dir = f"topic_{topic_index}"
|
||||
|
||||
# 从原始内容中提取tag
|
||||
original_tag = []
|
||||
original_content = process_llm_json_text(generated_content)
|
||||
if original_content and isinstance(original_content, dict) and "tag" in original_content:
|
||||
original_tag = original_content.get("tag", [])
|
||||
logger.info(f"从原始内容中提取到标签: {original_tag}")
|
||||
else:
|
||||
logger.warning("从原始内容提取标签失败")
|
||||
|
||||
# 将字典转换为JSON字符串,以便在提示中使用
|
||||
if isinstance(generated_content, dict):
|
||||
generated_content_str = json.dumps(generated_content, ensure_ascii=False, indent=2)
|
||||
else:
|
||||
generated_content_str = str(generated_content)
|
||||
|
||||
# 1. 构建提示
|
||||
system_prompt = self.prompt_builder.get_system_prompt()
|
||||
user_prompt = self.prompt_builder.build_user_prompt(
|
||||
generated_content=generated_content_str,
|
||||
topic=topic
|
||||
)
|
||||
|
||||
# 保存提示词
|
||||
if self.output_manager:
|
||||
self.output_manager.save_text(system_prompt, f"{topic_dir}/judger_system_prompt.txt")
|
||||
self.output_manager.save_text(user_prompt, f"{topic_dir}/judger_user_prompt.txt")
|
||||
|
||||
# 获取模型参数
|
||||
model_params = {}
|
||||
if hasattr(self.content_config, 'judger_model') and isinstance(self.content_config.judger_model, dict):
|
||||
model_params = {
|
||||
'temperature': self.content_config.judger_model.get('temperature'),
|
||||
'top_p': self.content_config.judger_model.get('top_p'),
|
||||
'presence_penalty': self.content_config.judger_model.get('presence_penalty')
|
||||
}
|
||||
# 移除None值
|
||||
model_params = {k: v for k, v in model_params.items() if v is not None}
|
||||
|
||||
# 2. 调用AI进行审核
|
||||
try:
|
||||
raw_result, _, _, _ = await self.ai_agent.generate_text(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
use_stream=True,
|
||||
stage="内容审核",
|
||||
**model_params
|
||||
)
|
||||
|
||||
# 保存原始响应
|
||||
if self.output_manager:
|
||||
self.output_manager.save_text(raw_result, f"{topic_dir}/judger_raw_response.txt")
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"内容审核时AI调用失败: {e}", exc_info=True)
|
||||
return {"judge_success": False, "error": str(e)}
|
||||
|
||||
# 3. 解析结果
|
||||
judged_data = process_llm_json_text(raw_result)
|
||||
if judged_data and isinstance(judged_data, dict) and "title" in judged_data and "content" in judged_data:
|
||||
judged_data["judge_success"] = True
|
||||
|
||||
# 直接使用原始内容中的标签
|
||||
if original_tag:
|
||||
judged_data["tag"] = original_tag
|
||||
# 如果原始内容中没有标签,则使用默认标签
|
||||
logger.info(f"内容审核成功完成,使用标签: {judged_data.get('tag', [])}")
|
||||
|
||||
# 保存审核后的内容
|
||||
if self.output_manager:
|
||||
self.output_manager.save_json(judged_data, f"{topic_dir}/article_judged.json")
|
||||
|
||||
return judged_data
|
||||
else:
|
||||
logger.warning(f"审核响应JSON格式不正确或缺少键")
|
||||
return {"judge_success": False, "error": "Invalid JSON response", "raw_response": raw_result}
|
||||
|
||||
async def judge_content_with_prompt(self, generated_content: Union[str, Dict[str, Any]], topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Dict[str, Any]:
|
||||
"""
|
||||
使用预构建的提示词审核生成的内容
|
||||
|
||||
Args:
|
||||
generated_content: 已生成的原始内容(JSON字符串或字典对象)
|
||||
topic: 与内容相关的原始选题字典
|
||||
system_prompt: 系统提示词
|
||||
user_prompt: 用户提示词
|
||||
|
||||
Returns:
|
||||
一个包含审核结果的字典
|
||||
"""
|
||||
logger.info("开始使用预构建提示词审核生成的内容...")
|
||||
|
||||
# 获取主题索引,用于保存文件
|
||||
topic_index = topic.get('index', 'unknown')
|
||||
topic_dir = f"topic_{topic_index}"
|
||||
|
||||
# 从原始内容中提取tag
|
||||
original_tag = []
|
||||
original_content = process_llm_json_text(generated_content)
|
||||
if original_content and isinstance(original_content, dict) and "tag" in original_content:
|
||||
original_tag = original_content.get("tag", [])
|
||||
logger.info(f"从原始内容中提取到标签: {original_tag}")
|
||||
else:
|
||||
logger.warning("从原始内容提取标签失败")
|
||||
|
||||
# 保存提示词
|
||||
if self.output_manager:
|
||||
self.output_manager.save_text(system_prompt, f"{topic_dir}/judger_system_prompt.txt")
|
||||
self.output_manager.save_text(user_prompt, f"{topic_dir}/judger_user_prompt.txt")
|
||||
|
||||
# 获取模型参数
|
||||
model_params = {}
|
||||
if hasattr(self.content_config, 'judger_model') and isinstance(self.content_config.judger_model, dict):
|
||||
model_params = {
|
||||
'temperature': self.content_config.judger_model.get('temperature'),
|
||||
'top_p': self.content_config.judger_model.get('top_p'),
|
||||
'presence_penalty': self.content_config.judger_model.get('presence_penalty')
|
||||
}
|
||||
# 移除None值
|
||||
model_params = {k: v for k, v in model_params.items() if v is not None}
|
||||
|
||||
# 2. 调用AI进行审核
|
||||
try:
|
||||
raw_result, _, _, _ = await self.ai_agent.generate_text(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
use_stream=True,
|
||||
stage="内容审核",
|
||||
**model_params
|
||||
)
|
||||
|
||||
# 保存原始响应
|
||||
if self.output_manager:
|
||||
self.output_manager.save_text(raw_result, f"{topic_dir}/judger_raw_response.txt")
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"内容审核时AI调用失败: {e}", exc_info=True)
|
||||
return {"judge_success": False, "error": str(e)}
|
||||
|
||||
# 3. 解析结果
|
||||
judged_data = process_llm_json_text(raw_result)
|
||||
if judged_data and isinstance(judged_data, dict) and "title" in judged_data and "content" in judged_data:
|
||||
judged_data["judge_success"] = True
|
||||
judged_data.pop("analysis")
|
||||
# 直接使用原始内容中的标签
|
||||
if original_tag:
|
||||
judged_data["tag"] = original_tag
|
||||
# 如果原始内容中没有标签,则使用默认标签
|
||||
logger.info(f"内容审核成功完成,使用标签: {judged_data.get('tag', [])}")
|
||||
|
||||
# 保存审核后的内容
|
||||
if self.output_manager:
|
||||
self.output_manager.save_json(judged_data, f"{topic_dir}/article_judged.json")
|
||||
|
||||
return judged_data
|
||||
else:
|
||||
logger.warning(f"审核响应JSON格式不正确或缺少键")
|
||||
return {"judge_success": False, "error": "Invalid JSON response", "raw_response": raw_result}
|
||||
@ -1,149 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
选题生成模块
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
from core.ai import AIAgent
|
||||
from core.config import ConfigManager, GenerateTopicConfig
|
||||
from utils.prompts import TopicPromptBuilder
|
||||
from utils.file_io import OutputManager, process_llm_json_text
|
||||
from .topic_parser import TopicParser
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TopicGenerator:
|
||||
"""
|
||||
选题生成器
|
||||
负责生成旅游相关的选题
|
||||
"""
|
||||
|
||||
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
|
||||
"""
|
||||
初始化选题生成器
|
||||
|
||||
Args:
|
||||
ai_agent: AI代理
|
||||
config_manager: 配置管理器
|
||||
output_manager: 输出管理器
|
||||
"""
|
||||
self.ai_agent = ai_agent
|
||||
self.config_manager = config_manager
|
||||
self.config = config_manager.get_config('topic_gen', GenerateTopicConfig)
|
||||
self.output_manager = output_manager
|
||||
self.prompt_builder = TopicPromptBuilder(config_manager)
|
||||
self.parser = TopicParser()
|
||||
|
||||
async def generate_topics(self) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
执行完整的选题生成流程:构建提示 -> 调用AI -> 解析结果 -> 保存产物
|
||||
"""
|
||||
logger.info("开始执行选题生成流程...")
|
||||
|
||||
# 1. 构建提示
|
||||
system_prompt = self.prompt_builder.get_system_prompt()
|
||||
user_prompt = self.prompt_builder.build_user_prompt(
|
||||
numTopics=self.config.topic.num,
|
||||
month=self.config.topic.date
|
||||
)
|
||||
self.output_manager.save_text(system_prompt, "topic_system_prompt.txt")
|
||||
self.output_manager.save_text(user_prompt, "topic_user_prompt.txt")
|
||||
|
||||
# 获取模型参数
|
||||
model_params = {}
|
||||
if hasattr(self.config, 'model') and isinstance(self.config.model, dict):
|
||||
model_params = {
|
||||
'temperature': self.config.model.get('temperature'),
|
||||
'top_p': self.config.model.get('top_p'),
|
||||
'presence_penalty': self.config.model.get('presence_penalty')
|
||||
}
|
||||
# 移除None值
|
||||
model_params = {k: v for k, v in model_params.items() if v is not None}
|
||||
|
||||
# 2. 调用AI生成
|
||||
try:
|
||||
raw_result, _, _, _ = await self.ai_agent.generate_text(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
use_stream=True, # 选题生成通常不需要流式输出
|
||||
stage="选题生成",
|
||||
**model_params
|
||||
)
|
||||
self.output_manager.save_text(raw_result, "topics_raw_response.txt")
|
||||
except Exception as e:
|
||||
logger.critical(f"AI调用失败,无法生成选题: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
# 3. 解析结果
|
||||
topics = self.parser.parse(raw_result)
|
||||
if not topics:
|
||||
logger.error("未能从AI响应中解析出任何有效选题")
|
||||
return None
|
||||
|
||||
# 4. 保存结果
|
||||
self.output_manager.save_json(topics, "topics.json")
|
||||
logger.info(f"成功生成并保存 {len(topics)} 个选题")
|
||||
|
||||
return topics
|
||||
|
||||
async def generate_topics_with_prompt(self, system_prompt: str, user_prompt: str) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
使用预构建的提示词生成选题
|
||||
|
||||
Args:
|
||||
system_prompt: 已构建好的系统提示词
|
||||
user_prompt: 已构建好的用户提示词
|
||||
|
||||
Returns:
|
||||
生成的选题列表,如果失败则返回None
|
||||
"""
|
||||
logger.info("使用预构建提示词开始执行选题生成流程...")
|
||||
|
||||
# 保存提示以供调试
|
||||
self.output_manager.save_text(system_prompt, "topic_system_prompt.txt")
|
||||
self.output_manager.save_text(user_prompt, "topic_user_prompt.txt")
|
||||
|
||||
# 获取模型参数
|
||||
model_params = {}
|
||||
if hasattr(self.config, 'model') and isinstance(self.config.model, dict):
|
||||
model_params = {
|
||||
'temperature': self.config.model.get('temperature'),
|
||||
'top_p': self.config.model.get('top_p'),
|
||||
'presence_penalty': self.config.model.get('presence_penalty')
|
||||
}
|
||||
# 移除None值
|
||||
model_params = {k: v for k, v in model_params.items() if v is not None}
|
||||
|
||||
# 调用AI生成
|
||||
try:
|
||||
raw_result, _, _, _ = await self.ai_agent.generate_text(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
use_stream=True,
|
||||
stage="选题生成",
|
||||
**model_params
|
||||
)
|
||||
self.output_manager.save_text(raw_result, "topics_raw_response.txt")
|
||||
except Exception as e:
|
||||
logger.critical(f"AI调用失败,无法生成选题: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
# 解析结果
|
||||
topics = self.parser.parse(raw_result)
|
||||
if not topics:
|
||||
logger.error("未能从AI响应中解析出任何有效选题")
|
||||
return None
|
||||
|
||||
# 保存结果
|
||||
self.output_manager.save_json(topics, "topics.json")
|
||||
logger.info(f"成功生成并保存 {len(topics)} 个选题")
|
||||
|
||||
return topics
|
||||
|
||||
|
||||
|
||||
@ -1,59 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
AI响应解析器模块
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from utils.file_io import process_llm_json_text
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TopicParser:
|
||||
"""
|
||||
解析和验证由AI模型生成的选题列表
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def parse(raw_text: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
从原始文本解析、修复和验证JSON
|
||||
|
||||
Args:
|
||||
raw_text: AI模型返回的原始字符串
|
||||
|
||||
Returns:
|
||||
一个字典列表,每个字典代表一个有效的选题
|
||||
"""
|
||||
logger.info("开始解析AI生成的选题...")
|
||||
|
||||
# 使用通用JSON解析函数解析原始文本
|
||||
parsed_json = process_llm_json_text(raw_text)
|
||||
|
||||
if not parsed_json:
|
||||
logger.error("解析AI响应失败,无法获取JSON数据")
|
||||
return []
|
||||
|
||||
if not isinstance(parsed_json, list):
|
||||
logger.error(f"解析结果不是列表,而是 {type(parsed_json)}")
|
||||
return []
|
||||
|
||||
logger.info(f"成功解析 {len(parsed_json)} 个选题对象。开始验证...")
|
||||
|
||||
# 验证每个选题是否包含所有必需的键
|
||||
valid_topics = []
|
||||
required_keys = {"index", "date", "logic", "object", "product", "style", "targetAudience"}
|
||||
optional_keys = {"productLogic", "styleLogic", "targetAudienceLogic"}
|
||||
|
||||
for i, item in enumerate(parsed_json):
|
||||
if isinstance(item, dict) and required_keys.issubset(item.keys()):
|
||||
valid_topics.append(item)
|
||||
else:
|
||||
logger.warning(f"第 {i+1} 个选题缺少必需键或格式不正确: {item}")
|
||||
|
||||
logger.info(f"验证完成,获得 {len(valid_topics)} 个有效选题。")
|
||||
return valid_topics
|
||||
Binary file not shown.
@ -1,148 +0,0 @@
|
||||
# ID映射机制优化说明
|
||||
|
||||
## 🎯 解决的问题
|
||||
|
||||
### 原有问题
|
||||
1. **数据库查询失败**:ID查询时经常找不到对应数据
|
||||
2. **名称匹配不准确**:AI生成的选题名称与数据库中的名称不完全匹配
|
||||
3. **缺少兜底机制**:查询失败时没有备用方案
|
||||
4. **ID追踪缺失**:选题生成后无法保持ID的连续性
|
||||
|
||||
## 🔧 优化方案
|
||||
|
||||
### 1. 增强ID映射逻辑 (`api/routers/tweet.py`)
|
||||
|
||||
#### 模糊匹配机制
|
||||
```python
|
||||
def find_best_match(target_name: str, mapping: Dict[str, int]) -> Optional[int]:
|
||||
# 1. 精确匹配
|
||||
if target_name in mapping:
|
||||
return mapping[target_name]
|
||||
|
||||
# 2. 模糊匹配 - 去除空格后匹配
|
||||
target_clean = target_name.replace(" ", "").strip()
|
||||
for name, id_val in mapping.items():
|
||||
if name.replace(" ", "").strip() == target_clean:
|
||||
return id_val
|
||||
|
||||
# 3. 包含匹配 - 检查是否互相包含
|
||||
for name, id_val in mapping.items():
|
||||
if target_clean in name.replace(" ", "") or name.replace(" ", "") in target_clean:
|
||||
return id_val
|
||||
|
||||
# 4. 未找到匹配
|
||||
logger.warning(f"未找到匹配的ID: '{target_name}'")
|
||||
return None
|
||||
```
|
||||
|
||||
#### 匹配率监控
|
||||
- 记录每个选题的ID匹配情况
|
||||
- 计算匹配率并在匹配率低于50%时发出警告
|
||||
- 添加匹配元数据用于调试
|
||||
|
||||
### 2. 数据库服务兜底机制 (`api/services/database_service.py`)
|
||||
|
||||
#### 批量查询增强
|
||||
```python
|
||||
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
|
||||
# 检查哪些ID没有找到对应记录
|
||||
found_ids = {result['id'] for result in results}
|
||||
missing_ids = set(styleIds) - found_ids
|
||||
|
||||
if missing_ids:
|
||||
# 添加兜底数据
|
||||
fallback_styles = self._get_fallback_styles(list(missing_ids))
|
||||
results.extend(fallback_styles)
|
||||
```
|
||||
|
||||
#### 兜底数据提供
|
||||
- 当数据库查询失败时,提供默认的结构化数据
|
||||
- 标记兜底数据 (`_is_fallback: True`)
|
||||
- 确保系统可以继续运行
|
||||
|
||||
### 3. 内容生成阶段优化 (`api/services/tweet.py`)
|
||||
|
||||
#### 智能数据增强
|
||||
```python
|
||||
async def _enhance_topic_with_database_data(self, topic: Dict[str, Any]) -> Dict[str, Any]:
|
||||
# 优先使用ID从数据库获取最新数据
|
||||
if 'styleIds' in topic and topic['styleIds']:
|
||||
style_data = db_service.get_style_by_id(style_id)
|
||||
if style_data:
|
||||
enhanced_topic['style_object'] = style_data
|
||||
enhanced_topic['style'] = style_data.get('styleName')
|
||||
```
|
||||
|
||||
#### 多级兜底策略
|
||||
1. **第一级**:通过ID从数据库获取最新数据
|
||||
2. **第二级**:使用传入的对象参数作为兜底
|
||||
3. **第三级**:使用数据库服务的兜底数据
|
||||
|
||||
## 🔄 完整流程
|
||||
|
||||
### 选题生成阶段
|
||||
1. 接收ID列表 → 查询数据库获取完整对象
|
||||
2. 构建ID到名称的映射关系
|
||||
3. AI生成选题(包含名称)
|
||||
4. 将生成的选题名称映射回ID
|
||||
5. 返回包含ID的选题数据
|
||||
|
||||
### 内容生成阶段
|
||||
1. 接收带ID的选题数据
|
||||
2. 通过ID从数据库获取最新的详细信息
|
||||
3. 增强选题数据
|
||||
4. 生成内容
|
||||
|
||||
## 🎉 预期效果
|
||||
|
||||
### 1. 数据一致性
|
||||
- 确保整个流程中ID的连续性
|
||||
- 避免名称不匹配导致的数据丢失
|
||||
|
||||
### 2. 系统稳定性
|
||||
- 多级兜底机制确保系统不会因为数据库问题而崩溃
|
||||
- 详细的日志记录便于问题排查
|
||||
|
||||
### 3. 数据准确性
|
||||
- 内容生成时使用最新的数据库数据
|
||||
- 避免使用过期或不准确的缓存数据
|
||||
|
||||
### 4. 可观测性
|
||||
- 匹配率监控
|
||||
- 详细的日志记录
|
||||
- 兜底数据标记
|
||||
|
||||
## 🚀 使用建议
|
||||
|
||||
### 1. 监控日志
|
||||
关注以下日志信息:
|
||||
- ID匹配率低于50%的警告
|
||||
- 兜底数据使用情况
|
||||
- 数据库查询失败的频率
|
||||
|
||||
### 2. 数据维护
|
||||
- 定期清理重复数据
|
||||
- 更新"请修改产品名字"等占位数据
|
||||
- 确保软删除字段的正确使用
|
||||
|
||||
### 3. 性能优化
|
||||
- 考虑为常用数据添加缓存
|
||||
- 优化数据库查询性能
|
||||
- 定期清理无效数据
|
||||
|
||||
## 📋 测试验证
|
||||
|
||||
### 1. 功能测试
|
||||
- 测试各种ID组合的选题生成
|
||||
- 验证名称匹配的准确性
|
||||
- 确认兜底机制的有效性
|
||||
|
||||
### 2. 性能测试
|
||||
- 大批量选题生成的性能
|
||||
- 数据库查询的响应时间
|
||||
- 内存使用情况
|
||||
|
||||
### 3. 错误处理测试
|
||||
- 数据库连接失败时的行为
|
||||
- 无效ID的处理
|
||||
- 数据缺失时的兜底效果
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user