This commit is contained in:
jinye_huang 2025-07-29 20:39:26 +08:00
commit 5fc2c1cd08
33 changed files with 1079 additions and 96 deletions

Binary file not shown.

View File

@ -49,15 +49,40 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
try:
# 创建临时文件处理base64文档
if request.documents:
temp_files = []
for doc in request.documents:
try:
# 从base64内容中提取实际内容跳过data:image/jpeg;base64,这样的前缀)
content = doc.content
if ',' in content:
content = content.split(',', 1)[1]
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(doc.filename)[1]) as temp_file:
suffix = os.path.splitext(doc.filename)[1]
if not suffix:
# 根据MIME类型推断后缀
mime_to_ext = {
'text/plain': '.txt',
'application/pdf': '.pdf',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'image/jpeg': '.jpg',
'image/png': '.png'
}
suffix = mime_to_ext.get(doc.mime_type, '.bin')
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
# 解码base64内容并写入临时文件
content = base64.b64decode(doc.content)
temp_file.write(content)
temp_files.append(temp_file.name)
try:
decoded_content = base64.b64decode(content)
temp_file.write(decoded_content)
temp_files.append(temp_file.name)
logger.info(f"成功保存临时文件: {temp_file.name}")
except Exception as e:
logger.error(f"Base64解码失败: {e}")
raise HTTPException(
status_code=400,
detail=f"文档 {doc.filename} 的Base64内容无效: {str(e)}"
)
except Exception as e:
logger.error(f"处理文档 {doc.filename} 失败: {e}")
raise HTTPException(
@ -70,8 +95,8 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
# 调用服务层处理
result = await integration_service.integrate_content(
document_paths=temp_files,
keywords=request.keywords,
cookies=request.cookies,
keywords=request.keywords or [],
cookies=request.cookies or "",
sort_type=request.sort_type,
note_type=request.note_type,
note_time=request.note_time,
@ -80,36 +105,7 @@ async def integrate_content(request: ContentIntegrationRequest) -> ContentIntegr
query_num=request.query_num
)
# 转换为响应模型
if result["success"]:
response = ContentIntegrationResponse(
success=True,
timestamp=result["timestamp"],
processing_time=result["processing_time"],
input_summary=result["input_summary"],
document_info=result["document_info"],
xhs_info=result["xhs_info"],
integrated_content=result["integrated_content"],
search_config=result["search_config"],
error_message=None # 成功时无错误信息
)
logger.info(f"内容整合成功,处理时间:{result['processing_time']}")
else:
from datetime import datetime
response = ContentIntegrationResponse(
success=False,
timestamp=result.get("timestamp", datetime.now().strftime("%Y%m%d_%H%M%S")),
processing_time=result.get("processing_time", "0秒"),
input_summary=result.get("input_summary"),
document_info=result.get("document_info"),
xhs_info=result.get("xhs_info"),
integrated_content=result.get("integrated_content"),
search_config=result.get("search_config"),
error_message=result.get("error_message")
)
logger.error(f"内容整合失败:{result['error_message']}")
return response
return result
except Exception as e:
logger.error(f"内容整合接口异常:{e}", exc_info=True)

View File

@ -125,38 +125,96 @@ def _add_ids_to_topics(topics: List[Dict[str, Any]], id_name_mappings: Dict[str,
Returns:
包含ID字段的选题列表
"""
def find_best_match(target_name: str, mapping: Dict[str, int]) -> Optional[int]:
"""
寻找最佳匹配的ID支持模糊匹配
"""
if not target_name or not mapping:
return None
# 1. 精确匹配
if target_name in mapping:
return mapping[target_name]
# 2. 模糊匹配 - 去除空格后匹配
target_clean = target_name.replace(" ", "").strip()
for name, id_val in mapping.items():
if name.replace(" ", "").strip() == target_clean:
logger.info(f"模糊匹配成功: '{target_name}' -> '{name}' (ID: {id_val})")
return id_val
# 3. 包含匹配 - 检查是否互相包含
for name, id_val in mapping.items():
if target_clean in name.replace(" ", "") or name.replace(" ", "") in target_clean:
logger.info(f"包含匹配成功: '{target_name}' -> '{name}' (ID: {id_val})")
return id_val
# 4. 未找到匹配
logger.warning(f"未找到匹配的ID: '{target_name}', 可用选项: {list(mapping.keys())}")
return None
enriched_topics = []
for topic in topics:
# 复制原topic
enriched_topic = topic.copy()
# 添加ID字段
# 初始化ID字段
enriched_topic['styleIds'] = []
enriched_topic['audienceIds'] = []
enriched_topic['scenicSpotIds'] = []
enriched_topic['productIds'] = []
# 记录匹配结果
match_results = {
'style_matched': False,
'audience_matched': False,
'scenic_spot_matched': False,
'product_matched': False
}
# 根据topic中的name查找对应的ID
if 'style' in topic and topic['style']:
style_name = topic['style']
if style_name in id_name_mappings['style_mapping']:
enriched_topic['styleIds'] = [id_name_mappings['style_mapping'][style_name]]
style_id = find_best_match(topic['style'], id_name_mappings['style_mapping'])
if style_id:
enriched_topic['styleIds'] = [style_id]
match_results['style_matched'] = True
if 'targetAudience' in topic and topic['targetAudience']:
audience_name = topic['targetAudience']
if audience_name in id_name_mappings['audience_mapping']:
enriched_topic['audienceIds'] = [id_name_mappings['audience_mapping'][audience_name]]
audience_id = find_best_match(topic['targetAudience'], id_name_mappings['audience_mapping'])
if audience_id:
enriched_topic['audienceIds'] = [audience_id]
match_results['audience_matched'] = True
if 'object' in topic and topic['object']:
spot_name = topic['object']
if spot_name in id_name_mappings['scenic_spot_mapping']:
enriched_topic['scenicSpotIds'] = [id_name_mappings['scenic_spot_mapping'][spot_name]]
spot_id = find_best_match(topic['object'], id_name_mappings['scenic_spot_mapping'])
if spot_id:
enriched_topic['scenicSpotIds'] = [spot_id]
match_results['scenic_spot_matched'] = True
if 'product' in topic and topic['product']:
product_name = topic['product']
if product_name in id_name_mappings['product_mapping']:
enriched_topic['productIds'] = [id_name_mappings['product_mapping'][product_name]]
product_id = find_best_match(topic['product'], id_name_mappings['product_mapping'])
if product_id:
enriched_topic['productIds'] = [product_id]
match_results['product_matched'] = True
# 记录匹配情况
total_fields = sum(1 for key in ['style', 'targetAudience', 'object', 'product'] if key in topic and topic[key])
matched_fields = sum(match_results.values())
if total_fields > 0:
match_rate = matched_fields / total_fields * 100
logger.info(f"选题 {topic.get('index', 'N/A')} ID匹配率: {match_rate:.1f}% ({matched_fields}/{total_fields})")
# 如果匹配率低于50%,记录警告
if match_rate < 50:
logger.warning(f"选题 {topic.get('index', 'N/A')} ID匹配率较低: {match_rate:.1f}%")
# 添加匹配元数据
# enriched_topic['_id_match_metadata'] = {
# 'match_results': match_results,
# 'match_rate': matched_fields / max(total_fields, 1) * 100 if total_fields > 0 else 0
# }
enriched_topics.append(enriched_topic)

View File

@ -78,7 +78,8 @@ class ContentIntegrationService:
整合结果字典
"""
start_time = time.time()
logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords)}")
logger.info(f"开始整合任务:文档数量 {len(document_paths)}, 关键词数量 {len(keywords) if keywords else 0}")
try:
# 确保输出目录存在

View File

@ -55,10 +55,23 @@ class DatabaseService:
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
# 创建连接池
# 从配置中分离MySQL连接池支持的参数和不支持的参数
config = attempt["config"].copy()
# MySQL连接池不支持的参数需要移除
unsupported_params = [
'max_retry_attempts', 'query_timeout', 'soft_delete_field', 'active_record_value'
]
for param in unsupported_params:
config.pop(param, None)
# 设置连接池参数,使用配置文件中的值或默认值
pool_size = config.pop('pool_size', 10)
pool = pooling.MySQLConnectionPool(
pool_name=f"database_service_pool_{int(time.time())}",
pool_size=10,
**attempt["config"]
pool_size=pool_size,
**config
)
# 测试连接
@ -210,7 +223,8 @@ class DatabaseService:
景区信息列表
"""
if not self.db_pool or not spot_ids:
return []
logger.warning("数据库连接池未初始化或景区ID列表为空")
return self._get_fallback_scenic_spots(spot_ids)
try:
with self.db_pool.get_connection() as conn:
@ -220,12 +234,42 @@ class DatabaseService:
query = f"SELECT * FROM scenicSpot WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, spot_ids)
results = cursor.fetchall()
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len(results)}")
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(spot_ids) - found_ids
if missing_ids:
logger.warning(f"部分景区ID未找到: {missing_ids}")
# 添加兜底数据
fallback_spots = self._get_fallback_scenic_spots(list(missing_ids))
results.extend(fallback_spots)
logger.info(f"批量查询景区信息: 请求{len(spot_ids)}个,找到{len([r for r in results if r['id'] in spot_ids])}")
return results
except Exception as e:
logger.error(f"批量查询景区信息失败: {e}")
return []
return self._get_fallback_scenic_spots(spot_ids)
def _get_fallback_scenic_spots(self, spot_ids: List[int]) -> List[Dict[str, Any]]:
"""
获取景区的兜底数据
"""
fallback_spots = []
for spot_id in spot_ids:
fallback_spots.append({
'id': spot_id,
'name': f'景区{spot_id}',
'address': '默认地址',
'advantage': '优美的自然风光',
'highlight': '值得一游的景点',
'isPublic': 1,
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底景区数据: {len(fallback_spots)}")
return fallback_spots
def get_products_by_ids(self, productIds: List[int]) -> List[Dict[str, Any]]:
"""
@ -238,7 +282,8 @@ class DatabaseService:
产品信息列表
"""
if not self.db_pool or not productIds:
return []
logger.warning("数据库连接池未初始化或产品ID列表为空")
return self._get_fallback_products(productIds)
try:
with self.db_pool.get_connection() as conn:
@ -248,12 +293,42 @@ class DatabaseService:
query = f"SELECT * FROM product WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, productIds)
results = cursor.fetchall()
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len(results)}")
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(productIds) - found_ids
if missing_ids:
logger.warning(f"部分产品ID未找到: {missing_ids}")
# 添加兜底数据
fallback_products = self._get_fallback_products(list(missing_ids))
results.extend(fallback_products)
logger.info(f"批量查询产品信息: 请求{len(productIds)}个,找到{len([r for r in results if r['id'] in productIds])}")
return results
except Exception as e:
logger.error(f"批量查询产品信息失败: {e}")
return []
return self._get_fallback_products(productIds)
def _get_fallback_products(self, productIds: List[int]) -> List[Dict[str, Any]]:
"""
获取产品的兜底数据
"""
fallback_products = []
for product_id in productIds:
fallback_products.append({
'id': product_id,
'productName': f'产品{product_id}',
'originPrice': 999,
'realPrice': 888,
'packageInfo': '包含景区门票和导游服务',
'advantage': '性价比高,服务优质',
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底产品数据: {len(fallback_products)}")
return fallback_products
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
"""
@ -266,7 +341,8 @@ class DatabaseService:
风格信息列表
"""
if not self.db_pool or not styleIds:
return []
logger.warning("数据库连接池未初始化或风格ID列表为空")
return self._get_fallback_styles(styleIds)
try:
with self.db_pool.get_connection() as conn:
@ -276,12 +352,39 @@ class DatabaseService:
query = f"SELECT * FROM contentStyle WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, styleIds)
results = cursor.fetchall()
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len(results)}")
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(styleIds) - found_ids
if missing_ids:
logger.warning(f"部分风格ID未找到: {missing_ids}")
# 添加兜底数据
fallback_styles = self._get_fallback_styles(list(missing_ids))
results.extend(fallback_styles)
logger.info(f"批量查询风格信息: 请求{len(styleIds)}个,找到{len([r for r in results if r['id'] in styleIds])}")
return results
except Exception as e:
logger.error(f"批量查询风格信息失败: {e}")
return []
return self._get_fallback_styles(styleIds)
def _get_fallback_styles(self, styleIds: List[int]) -> List[Dict[str, Any]]:
"""
获取风格的兜底数据
"""
fallback_styles = []
for style_id in styleIds:
fallback_styles.append({
'id': style_id,
'styleName': f'风格{style_id}',
'description': '默认风格描述',
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底风格数据: {len(fallback_styles)}")
return fallback_styles
def get_audiences_by_ids(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
"""
@ -294,7 +397,8 @@ class DatabaseService:
受众信息列表
"""
if not self.db_pool or not audienceIds:
return []
logger.warning("数据库连接池未初始化或受众ID列表为空")
return self._get_fallback_audiences(audienceIds)
try:
with self.db_pool.get_connection() as conn:
@ -304,12 +408,39 @@ class DatabaseService:
query = f"SELECT * FROM targetAudience WHERE id IN ({placeholders}) AND isDelete = 0"
cursor.execute(query, audienceIds)
results = cursor.fetchall()
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len(results)}")
# 检查是否所有ID都找到了对应的记录
found_ids = {result['id'] for result in results}
missing_ids = set(audienceIds) - found_ids
if missing_ids:
logger.warning(f"部分受众ID未找到: {missing_ids}")
# 添加兜底数据
fallback_audiences = self._get_fallback_audiences(list(missing_ids))
results.extend(fallback_audiences)
logger.info(f"批量查询受众信息: 请求{len(audienceIds)}个,找到{len([r for r in results if r['id'] in audienceIds])}")
return results
except Exception as e:
logger.error(f"批量查询受众信息失败: {e}")
return []
return self._get_fallback_audiences(audienceIds)
def _get_fallback_audiences(self, audienceIds: List[int]) -> List[Dict[str, Any]]:
"""
获取受众的兜底数据
"""
fallback_audiences = []
for audience_id in audienceIds:
fallback_audiences.append({
'id': audience_id,
'audienceName': f'受众{audience_id}',
'description': '默认受众描述',
'isDelete': 0,
'_is_fallback': True
})
logger.info(f"使用兜底受众数据: {len(fallback_audiences)}")
return fallback_audiences
def list_all_scenic_spots(self, user_id: Optional[int] = None, is_public: Optional[bool] = None) -> List[Dict[str, Any]]:
"""

View File

@ -114,10 +114,23 @@ class PromptService:
logger.info(f"尝试连接数据库 ({attempt['desc']}): {connection_info}")
# 创建连接池
# 从配置中分离MySQL连接池支持的参数和不支持的参数
config = attempt["config"].copy()
# MySQL连接池不支持的参数需要移除
unsupported_params = [
'max_retry_attempts', 'query_timeout', 'soft_delete_field', 'active_record_value'
]
for param in unsupported_params:
config.pop(param, None)
# 设置连接池参数,使用配置文件中的值或默认值
pool_size = config.pop('pool_size', 5)
pool = pooling.MySQLConnectionPool(
pool_name=f"prompt_service_pool_{int(time.time())}",
pool_size=5,
**attempt["config"]
pool_size=pool_size,
**config
)
# 测试连接

View File

@ -19,6 +19,7 @@ from tweet.content_generator import ContentGenerator
from tweet.content_judger import ContentJudger
from api.services.prompt_builder import PromptBuilderService
from api.services.prompt_service import PromptService
from api.services.database_service import DatabaseService
logger = logging.getLogger(__name__)
@ -109,22 +110,21 @@ class TweetService:
logger.info(f"选题生成完成请求ID: {requestId}, 数量: {len(topics)}")
return requestId, topics
async def generate_content(self, topic: Optional[Dict[str, Any]] = None,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None,
autoJudge: bool = False) -> Tuple[str, str, Dict[str, Any]]:
async def generate_content(self, topic: Optional[Dict[str, Any]] = None, autoJudge: bool = False,
style_objects: Optional[List[Dict[str, Any]]] = None,
audience_objects: Optional[List[Dict[str, Any]]] = None,
scenic_spot_objects: Optional[List[Dict[str, Any]]] = None,
product_objects: Optional[List[Dict[str, Any]]] = None) -> Tuple[str, str, Dict[str, Any]]:
"""
选题生成内容
单个选题生成内容
Args:
topic: 选题信息
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
autoJudge: 是否自动进行内容审核
topic: 选题信息可能包含ID字段
autoJudge: 是否进行内嵌审核
style_objects: 风格对象列表可选用于兼容
audience_objects: 受众对象列表可选用于兼容
scenic_spot_objects: 景区对象列表可选用于兼容
product_objects: 产品对象列表可选用于兼容
Returns:
请求ID选题索引和生成的内容包含judgeSuccess状态
@ -135,22 +135,24 @@ class TweetService:
topicIndex = topic.get('index', 'N/A')
logger.info(f"开始为选题 {topicIndex} 生成内容{'(含审核)' if autoJudge else ''}")
# 核心修改创建一个增强版的topic将所有需要的信息预先填充好
enhanced_topic = topic.copy()
if style_objects:
# 增强版的topic处理优先使用ID获取最新数据
enhanced_topic = await self._enhance_topic_with_database_data(topic)
# 如果没有通过ID获取到数据使用传入的对象参数作为兜底
if style_objects and not enhanced_topic.get('style_object'):
enhanced_topic['style_object'] = style_objects[0]
enhanced_topic['style'] = style_objects[0].get('styleName')
if audience_objects:
if audience_objects and not enhanced_topic.get('audience_object'):
enhanced_topic['audience_object'] = audience_objects[0]
enhanced_topic['targetAudience'] = audience_objects[0].get('audienceName')
if scenic_spot_objects:
if scenic_spot_objects and not enhanced_topic.get('scenic_spot_object'):
enhanced_topic['scenic_spot_object'] = scenic_spot_objects[0]
enhanced_topic['object'] = scenic_spot_objects[0].get('name')
if product_objects:
if product_objects and not enhanced_topic.get('product_object'):
enhanced_topic['product_object'] = product_objects[0]
enhanced_topic['product'] = product_objects[0].get('productName')
# 使用PromptBuilderService构建提示词现在它只需要enhanced_topic
# 使用PromptBuilderService构建提示词
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(enhanced_topic, "content")
# 使用预构建的提示词生成内容
@ -179,23 +181,81 @@ class TweetService:
content = {k: v for k, v in judged_content.items() if k != 'judge_success'}
content['judgeSuccess'] = True
else:
logger.warning(f"选题 {topicIndex} 内容审核失败,保持原始内容")
# 审核失败:保持原始内容添加judgeSuccess=False标记
logger.warning(f"选题 {topicIndex} 内容审核未通过")
# 审核失败:使用原始内容添加judgeSuccess状态
content['judgeSuccess'] = False
except Exception as e:
logger.error(f"选题 {topicIndex}嵌审核失败: {e},保持原始内容")
# 审核异常保持原始内容添加judgeSuccess=False标记
logger.error(f"选题 {topicIndex}容审核过程中发生错误: {e}", exc_info=True)
# 审核出错:使用原始内容,标记审核失败
content['judgeSuccess'] = False
else:
# 未启用审核添加judgeSuccess=None标记表示未进行审核
content['judgeSuccess'] = None
# 生成请求ID
requestId = f"content-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{str(uuid.uuid4())[:8]}"
logger.info(f"内容生成完成请求ID: {requestId}, 选题索引: {topicIndex}")
logger.info(f"选题 {topicIndex} 内容生成完成请求ID: {requestId}")
return requestId, topicIndex, content
async def _enhance_topic_with_database_data(self, topic: Dict[str, Any]) -> Dict[str, Any]:
"""
使用数据库数据增强选题信息
Args:
topic: 原始选题数据
Returns:
增强后的选题数据
"""
enhanced_topic = topic.copy()
try:
# 通过数据库服务获取详细信息
db_service = DatabaseService(self.config_manager)
if not db_service.is_available():
logger.warning("数据库服务不可用,无法增强选题数据")
return enhanced_topic
# 处理风格ID
if 'styleIds' in topic and topic['styleIds']:
style_id = topic['styleIds'][0] if isinstance(topic['styleIds'], list) else topic['styleIds']
style_data = db_service.get_style_by_id(style_id)
if style_data:
enhanced_topic['style_object'] = style_data
enhanced_topic['style'] = style_data.get('styleName')
logger.info(f"从数据库加载风格数据: {style_data.get('styleName')} (ID: {style_id})")
# 处理受众ID
if 'audienceIds' in topic and topic['audienceIds']:
audience_id = topic['audienceIds'][0] if isinstance(topic['audienceIds'], list) else topic['audienceIds']
audience_data = db_service.get_audience_by_id(audience_id)
if audience_data:
enhanced_topic['audience_object'] = audience_data
enhanced_topic['targetAudience'] = audience_data.get('audienceName')
logger.info(f"从数据库加载受众数据: {audience_data.get('audienceName')} (ID: {audience_id})")
# 处理景区ID
if 'scenicSpotIds' in topic and topic['scenicSpotIds']:
spot_id = topic['scenicSpotIds'][0] if isinstance(topic['scenicSpotIds'], list) else topic['scenicSpotIds']
spot_data = db_service.get_scenic_spot_by_id(spot_id)
if spot_data:
enhanced_topic['scenic_spot_object'] = spot_data
enhanced_topic['object'] = spot_data.get('name')
logger.info(f"从数据库加载景区数据: {spot_data.get('name')} (ID: {spot_id})")
# 处理产品ID
if 'productIds' in topic and topic['productIds']:
product_id = topic['productIds'][0] if isinstance(topic['productIds'], list) else topic['productIds']
product_data = db_service.get_product_by_id(product_id)
if product_data:
enhanced_topic['product_object'] = product_data
enhanced_topic['product'] = product_data.get('productName')
logger.info(f"从数据库加载产品数据: {product_data.get('productName')} (ID: {product_id})")
except Exception as e:
logger.error(f"增强选题数据时发生错误: {e}", exc_info=True)
return enhanced_topic
async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Tuple[str, str, Dict[str, Any]]:
"""
@ -303,6 +363,7 @@ class TweetService:
for topic in topics:
topicIndex = topic.get('index', 'unknown')
# 直接传递带有ID的选题数据不再需要传递额外的对象参数
_, _, content = await self.generate_content(topic, autoJudge=autoJudge)
if autoJudge:

View File

@ -4,5 +4,10 @@
"password": "civmek-rezTed-0hovre",
"database": "travel_content",
"port": 3306,
"charset": "utf8mb4"
"charset": "utf8mb4",
"pool_size": 10,
"max_retry_attempts": 3,
"query_timeout": 30,
"soft_delete_field": "isDelete",
"active_record_value": 0
}

View File

@ -0,0 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Tweet模块 - 负责文字内容的生成审核和管理
"""
__version__ = '1.0.0'

Binary file not shown.

View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内容生成模块
"""
import logging
import json
from typing import Dict, Any, Tuple, Optional
from core.ai import AIAgent
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
from utils.prompts import ContentPromptBuilder
from utils.file_io import OutputManager, process_llm_json_text
logger = logging.getLogger(__name__)
class ContentGenerator:
"""负责为单个选题生成内容"""
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
self.ai_agent = ai_agent
self.config_manager = config_manager
self.topic_config = config_manager.get_config('topic_gen', GenerateTopicConfig)
self.content_config = config_manager.get_config('content_gen', GenerateContentConfig)
self.output_manager = output_manager
self.prompt_builder = ContentPromptBuilder(config_manager)
async def generate_content_for_topic(self, topic: Dict[str, Any]) -> Dict[str, Any]:
"""
为单个选题生成内容
Args:
topic: 选题信息字典
Returns:
包含生成内容的字典
"""
topic_index = topic.get('index', 'N/A')
logger.info(f"开始为选题 {topic_index} 生成内容...")
# 1. 构建提示
# 使用模板构建器分别获取系统和用户提示
system_prompt = self.prompt_builder.get_system_prompt()
user_prompt = self.prompt_builder.build_user_prompt(topic=topic)
# 保存提示以供调试
output_dir = self.output_manager.get_topic_dir(topic_index)
self.output_manager.save_text(system_prompt, "content_system_prompt.txt", subdir=output_dir.name)
self.output_manager.save_text(user_prompt, "content_user_prompt.txt", subdir=output_dir.name)
# 获取模型参数
model_params = {}
if hasattr(self.content_config, 'model') and isinstance(self.content_config.model, dict):
model_params = {
'temperature': self.content_config.model.get('temperature'),
'top_p': self.content_config.model.get('top_p'),
'presence_penalty': self.content_config.model.get('presence_penalty')
}
# 移除None值
model_params = {k: v for k, v in model_params.items() if v is not None}
# 2. 调用AI
try:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True,
stage="内容生成",
**model_params
)
self.output_manager.save_text(raw_result, "content_raw_response.txt", subdir=output_dir.name)
except Exception as e:
logger.critical(f"为选题 {topic_index} 生成内容时AI调用失败: {e}", exc_info=True)
return {"error": str(e)}
# 3. 解析和保存结果
content_data = process_llm_json_text(raw_result)
if content_data:
self.output_manager.save_json(content_data, "article.json", subdir=output_dir.name)
logger.info(f"成功为选题 {topic_index} 生成并保存内容。")
return content_data
else:
logger.error(f"解析内容JSON失败 for {topic_index}")
return {"error": "JSONDecodeError", "raw_content": raw_result}
async def generate_content_with_prompt(self, topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Dict[str, Any]:
"""
使用已构建的提示词生成内容
Args:
topic: 选题信息字典
system_prompt: 已构建好的系统提示词
user_prompt: 已构建好的用户提示词
Returns:
包含生成内容的字典
"""
topic_index = topic.get('index', 'N/A')
logger.info(f"使用预构建提示词为选题 {topic_index} 生成内容...")
# 保存提示以供调试
output_dir = self.output_manager.get_topic_dir(topic_index)
self.output_manager.save_text(system_prompt, "content_system_prompt.txt", subdir=output_dir.name)
self.output_manager.save_text(user_prompt, "content_user_prompt.txt", subdir=output_dir.name)
# 获取模型参数
model_params = {}
if hasattr(self.content_config, 'model') and isinstance(self.content_config.model, dict):
model_params = {
'temperature': self.content_config.model.get('temperature'),
'top_p': self.content_config.model.get('top_p'),
'presence_penalty': self.content_config.model.get('presence_penalty')
}
# 移除None值
model_params = {k: v for k, v in model_params.items() if v is not None}
# 调用AI
try:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True,
stage="内容生成",
**model_params
)
self.output_manager.save_text(raw_result, "content_raw_response.txt", subdir=output_dir.name)
except Exception as e:
logger.critical(f"为选题 {topic_index} 生成内容时AI调用失败: {e}", exc_info=True)
return {"error": str(e)}
# 解析和保存结果
content_data = process_llm_json_text(raw_result)
if content_data:
self.output_manager.save_json(content_data, "article.json", subdir=output_dir.name)
logger.info(f"成功为选题 {topic_index} 生成并保存内容。")
return content_data
else:
logger.error(f"解析内容JSON失败 for {topic_index}")
return {"error": "JSONDecodeError", "raw_content": raw_result}

View File

@ -0,0 +1,212 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
内容审核模块
"""
import logging
import json
from typing import Dict, Any, Union
from core.ai import AIAgent
from core.config import ConfigManager, GenerateTopicConfig, GenerateContentConfig
from utils.prompts import JudgerPromptBuilder
from utils.file_io import process_llm_json_text
logger = logging.getLogger(__name__)
class ContentJudger:
"""内容审核类使用AI评估和修正内容"""
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager=None):
"""
初始化内容审核器
Args:
ai_agent: AIAgent实例
config_manager: 配置管理器
output_manager: 输出管理器用于保存提示词和响应
"""
self.ai_agent = ai_agent
self.config_manager = config_manager
self.topic_config = config_manager.get_config('topic_gen', GenerateTopicConfig)
self.content_config = config_manager.get_config('content_gen', GenerateContentConfig)
self.prompt_builder = JudgerPromptBuilder(config_manager)
self.output_manager = output_manager
async def judge_content(self, generated_content: Union[str, Dict[str, Any]], topic: Dict[str, Any]) -> Dict[str, Any]:
"""
调用AI审核生成的内容
Args:
generated_content: 已生成的原始内容JSON字符串或字典对象
topic: 与内容相关的原始选题字典
Returns:
一个包含审核结果的字典
"""
logger.info("开始审核生成的内容...")
# 获取主题索引,用于保存文件
topic_index = topic.get('index', 'unknown')
topic_dir = f"topic_{topic_index}"
# 从原始内容中提取tag
original_tag = []
original_content = process_llm_json_text(generated_content)
if original_content and isinstance(original_content, dict) and "tag" in original_content:
original_tag = original_content.get("tag", [])
logger.info(f"从原始内容中提取到标签: {original_tag}")
else:
logger.warning("从原始内容提取标签失败")
# 将字典转换为JSON字符串以便在提示中使用
if isinstance(generated_content, dict):
generated_content_str = json.dumps(generated_content, ensure_ascii=False, indent=2)
else:
generated_content_str = str(generated_content)
# 1. 构建提示
system_prompt = self.prompt_builder.get_system_prompt()
user_prompt = self.prompt_builder.build_user_prompt(
generated_content=generated_content_str,
topic=topic
)
# 保存提示词
if self.output_manager:
self.output_manager.save_text(system_prompt, f"{topic_dir}/judger_system_prompt.txt")
self.output_manager.save_text(user_prompt, f"{topic_dir}/judger_user_prompt.txt")
# 获取模型参数
model_params = {}
if hasattr(self.content_config, 'judger_model') and isinstance(self.content_config.judger_model, dict):
model_params = {
'temperature': self.content_config.judger_model.get('temperature'),
'top_p': self.content_config.judger_model.get('top_p'),
'presence_penalty': self.content_config.judger_model.get('presence_penalty')
}
# 移除None值
model_params = {k: v for k, v in model_params.items() if v is not None}
# 2. 调用AI进行审核
try:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True,
stage="内容审核",
**model_params
)
# 保存原始响应
if self.output_manager:
self.output_manager.save_text(raw_result, f"{topic_dir}/judger_raw_response.txt")
except Exception as e:
logger.critical(f"内容审核时AI调用失败: {e}", exc_info=True)
return {"judge_success": False, "error": str(e)}
# 3. 解析结果
judged_data = process_llm_json_text(raw_result)
if judged_data and isinstance(judged_data, dict) and "title" in judged_data and "content" in judged_data:
judged_data["judge_success"] = True
# 直接使用原始内容中的标签
if original_tag:
judged_data["tag"] = original_tag
# 如果原始内容中没有标签,则使用默认标签
logger.info(f"内容审核成功完成,使用标签: {judged_data.get('tag', [])}")
# 保存审核后的内容
if self.output_manager:
self.output_manager.save_json(judged_data, f"{topic_dir}/article_judged.json")
return judged_data
else:
logger.warning(f"审核响应JSON格式不正确或缺少键")
return {"judge_success": False, "error": "Invalid JSON response", "raw_response": raw_result}
async def judge_content_with_prompt(self, generated_content: Union[str, Dict[str, Any]], topic: Dict[str, Any], system_prompt: str, user_prompt: str) -> Dict[str, Any]:
"""
使用预构建的提示词审核生成的内容
Args:
generated_content: 已生成的原始内容JSON字符串或字典对象
topic: 与内容相关的原始选题字典
system_prompt: 系统提示词
user_prompt: 用户提示词
Returns:
一个包含审核结果的字典
"""
logger.info("开始使用预构建提示词审核生成的内容...")
# 获取主题索引,用于保存文件
topic_index = topic.get('index', 'unknown')
topic_dir = f"topic_{topic_index}"
# 从原始内容中提取tag
original_tag = []
original_content = process_llm_json_text(generated_content)
if original_content and isinstance(original_content, dict) and "tag" in original_content:
original_tag = original_content.get("tag", [])
logger.info(f"从原始内容中提取到标签: {original_tag}")
else:
logger.warning("从原始内容提取标签失败")
# 保存提示词
if self.output_manager:
self.output_manager.save_text(system_prompt, f"{topic_dir}/judger_system_prompt.txt")
self.output_manager.save_text(user_prompt, f"{topic_dir}/judger_user_prompt.txt")
# 获取模型参数
model_params = {}
if hasattr(self.content_config, 'judger_model') and isinstance(self.content_config.judger_model, dict):
model_params = {
'temperature': self.content_config.judger_model.get('temperature'),
'top_p': self.content_config.judger_model.get('top_p'),
'presence_penalty': self.content_config.judger_model.get('presence_penalty')
}
# 移除None值
model_params = {k: v for k, v in model_params.items() if v is not None}
# 2. 调用AI进行审核
try:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True,
stage="内容审核",
**model_params
)
# 保存原始响应
if self.output_manager:
self.output_manager.save_text(raw_result, f"{topic_dir}/judger_raw_response.txt")
except Exception as e:
logger.critical(f"内容审核时AI调用失败: {e}", exc_info=True)
return {"judge_success": False, "error": str(e)}
# 3. 解析结果
judged_data = process_llm_json_text(raw_result)
if judged_data and isinstance(judged_data, dict) and "title" in judged_data and "content" in judged_data:
judged_data["judge_success"] = True
judged_data.pop("analysis")
# 直接使用原始内容中的标签
if original_tag:
judged_data["tag"] = original_tag
# 如果原始内容中没有标签,则使用默认标签
logger.info(f"内容审核成功完成,使用标签: {judged_data.get('tag', [])}")
# 保存审核后的内容
if self.output_manager:
self.output_manager.save_json(judged_data, f"{topic_dir}/article_judged.json")
return judged_data
else:
logger.warning(f"审核响应JSON格式不正确或缺少键")
return {"judge_success": False, "error": "Invalid JSON response", "raw_response": raw_result}

View File

@ -0,0 +1,149 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
选题生成模块
"""
import logging
from typing import Dict, Any, List, Optional, Tuple
from core.ai import AIAgent
from core.config import ConfigManager, GenerateTopicConfig
from utils.prompts import TopicPromptBuilder
from utils.file_io import OutputManager, process_llm_json_text
from .topic_parser import TopicParser
logger = logging.getLogger(__name__)
class TopicGenerator:
"""
选题生成器
负责生成旅游相关的选题
"""
def __init__(self, ai_agent: AIAgent, config_manager: ConfigManager, output_manager: OutputManager):
"""
初始化选题生成器
Args:
ai_agent: AI代理
config_manager: 配置管理器
output_manager: 输出管理器
"""
self.ai_agent = ai_agent
self.config_manager = config_manager
self.config = config_manager.get_config('topic_gen', GenerateTopicConfig)
self.output_manager = output_manager
self.prompt_builder = TopicPromptBuilder(config_manager)
self.parser = TopicParser()
async def generate_topics(self) -> Optional[List[Dict[str, Any]]]:
"""
执行完整的选题生成流程构建提示 -> 调用AI -> 解析结果 -> 保存产物
"""
logger.info("开始执行选题生成流程...")
# 1. 构建提示
system_prompt = self.prompt_builder.get_system_prompt()
user_prompt = self.prompt_builder.build_user_prompt(
numTopics=self.config.topic.num,
month=self.config.topic.date
)
self.output_manager.save_text(system_prompt, "topic_system_prompt.txt")
self.output_manager.save_text(user_prompt, "topic_user_prompt.txt")
# 获取模型参数
model_params = {}
if hasattr(self.config, 'model') and isinstance(self.config.model, dict):
model_params = {
'temperature': self.config.model.get('temperature'),
'top_p': self.config.model.get('top_p'),
'presence_penalty': self.config.model.get('presence_penalty')
}
# 移除None值
model_params = {k: v for k, v in model_params.items() if v is not None}
# 2. 调用AI生成
try:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True, # 选题生成通常不需要流式输出
stage="选题生成",
**model_params
)
self.output_manager.save_text(raw_result, "topics_raw_response.txt")
except Exception as e:
logger.critical(f"AI调用失败无法生成选题: {e}", exc_info=True)
return None
# 3. 解析结果
topics = self.parser.parse(raw_result)
if not topics:
logger.error("未能从AI响应中解析出任何有效选题")
return None
# 4. 保存结果
self.output_manager.save_json(topics, "topics.json")
logger.info(f"成功生成并保存 {len(topics)} 个选题")
return topics
async def generate_topics_with_prompt(self, system_prompt: str, user_prompt: str) -> Optional[List[Dict[str, Any]]]:
"""
使用预构建的提示词生成选题
Args:
system_prompt: 已构建好的系统提示词
user_prompt: 已构建好的用户提示词
Returns:
生成的选题列表如果失败则返回None
"""
logger.info("使用预构建提示词开始执行选题生成流程...")
# 保存提示以供调试
self.output_manager.save_text(system_prompt, "topic_system_prompt.txt")
self.output_manager.save_text(user_prompt, "topic_user_prompt.txt")
# 获取模型参数
model_params = {}
if hasattr(self.config, 'model') and isinstance(self.config.model, dict):
model_params = {
'temperature': self.config.model.get('temperature'),
'top_p': self.config.model.get('top_p'),
'presence_penalty': self.config.model.get('presence_penalty')
}
# 移除None值
model_params = {k: v for k, v in model_params.items() if v is not None}
# 调用AI生成
try:
raw_result, _, _, _ = await self.ai_agent.generate_text(
system_prompt=system_prompt,
user_prompt=user_prompt,
use_stream=True,
stage="选题生成",
**model_params
)
self.output_manager.save_text(raw_result, "topics_raw_response.txt")
except Exception as e:
logger.critical(f"AI调用失败无法生成选题: {e}", exc_info=True)
return None
# 解析结果
topics = self.parser.parse(raw_result)
if not topics:
logger.error("未能从AI响应中解析出任何有效选题")
return None
# 保存结果
self.output_manager.save_json(topics, "topics.json")
logger.info(f"成功生成并保存 {len(topics)} 个选题")
return topics

View File

@ -0,0 +1,59 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI响应解析器模块
"""
import logging
import json
from typing import List, Dict, Any
from utils.file_io import process_llm_json_text
logger = logging.getLogger(__name__)
class TopicParser:
"""
解析和验证由AI模型生成的选题列表
"""
@staticmethod
def parse(raw_text: str) -> List[Dict[str, Any]]:
"""
从原始文本解析修复和验证JSON
Args:
raw_text: AI模型返回的原始字符串
Returns:
一个字典列表每个字典代表一个有效的选题
"""
logger.info("开始解析AI生成的选题...")
# 使用通用JSON解析函数解析原始文本
parsed_json = process_llm_json_text(raw_text)
if not parsed_json:
logger.error("解析AI响应失败无法获取JSON数据")
return []
if not isinstance(parsed_json, list):
logger.error(f"解析结果不是列表,而是 {type(parsed_json)}")
return []
logger.info(f"成功解析 {len(parsed_json)} 个选题对象。开始验证...")
# 验证每个选题是否包含所有必需的键
valid_topics = []
required_keys = {"index", "date", "logic", "object", "product", "style", "targetAudience"}
optional_keys = {"productLogic", "styleLogic", "targetAudienceLogic"}
for i, item in enumerate(parsed_json):
if isinstance(item, dict) and required_keys.issubset(item.keys()):
valid_topics.append(item)
else:
logger.warning(f"{i+1} 个选题缺少必需键或格式不正确: {item}")
logger.info(f"验证完成,获得 {len(valid_topics)} 个有效选题。")
return valid_topics

View File

@ -0,0 +1,148 @@
# ID映射机制优化说明
## 🎯 解决的问题
### 原有问题
1. **数据库查询失败**ID查询时经常找不到对应数据
2. **名称匹配不准确**AI生成的选题名称与数据库中的名称不完全匹配
3. **缺少兜底机制**:查询失败时没有备用方案
4. **ID追踪缺失**选题生成后无法保持ID的连续性
## 🔧 优化方案
### 1. 增强ID映射逻辑 (`api/routers/tweet.py`)
#### 模糊匹配机制
```python
def find_best_match(target_name: str, mapping: Dict[str, int]) -> Optional[int]:
# 1. 精确匹配
if target_name in mapping:
return mapping[target_name]
# 2. 模糊匹配 - 去除空格后匹配
target_clean = target_name.replace(" ", "").strip()
for name, id_val in mapping.items():
if name.replace(" ", "").strip() == target_clean:
return id_val
# 3. 包含匹配 - 检查是否互相包含
for name, id_val in mapping.items():
if target_clean in name.replace(" ", "") or name.replace(" ", "") in target_clean:
return id_val
# 4. 未找到匹配
logger.warning(f"未找到匹配的ID: '{target_name}'")
return None
```
#### 匹配率监控
- 记录每个选题的ID匹配情况
- 计算匹配率并在匹配率低于50%时发出警告
- 添加匹配元数据用于调试
### 2. 数据库服务兜底机制 (`api/services/database_service.py`)
#### 批量查询增强
```python
def get_styles_by_ids(self, styleIds: List[int]) -> List[Dict[str, Any]]:
# 检查哪些ID没有找到对应记录
found_ids = {result['id'] for result in results}
missing_ids = set(styleIds) - found_ids
if missing_ids:
# 添加兜底数据
fallback_styles = self._get_fallback_styles(list(missing_ids))
results.extend(fallback_styles)
```
#### 兜底数据提供
- 当数据库查询失败时,提供默认的结构化数据
- 标记兜底数据 (`_is_fallback: True`)
- 确保系统可以继续运行
### 3. 内容生成阶段优化 (`api/services/tweet.py`)
#### 智能数据增强
```python
async def _enhance_topic_with_database_data(self, topic: Dict[str, Any]) -> Dict[str, Any]:
# 优先使用ID从数据库获取最新数据
if 'styleIds' in topic and topic['styleIds']:
style_data = db_service.get_style_by_id(style_id)
if style_data:
enhanced_topic['style_object'] = style_data
enhanced_topic['style'] = style_data.get('styleName')
```
#### 多级兜底策略
1. **第一级**通过ID从数据库获取最新数据
2. **第二级**:使用传入的对象参数作为兜底
3. **第三级**:使用数据库服务的兜底数据
## 🔄 完整流程
### 选题生成阶段
1. 接收ID列表 → 查询数据库获取完整对象
2. 构建ID到名称的映射关系
3. AI生成选题包含名称
4. 将生成的选题名称映射回ID
5. 返回包含ID的选题数据
### 内容生成阶段
1. 接收带ID的选题数据
2. 通过ID从数据库获取最新的详细信息
3. 增强选题数据
4. 生成内容
## 🎉 预期效果
### 1. 数据一致性
- 确保整个流程中ID的连续性
- 避免名称不匹配导致的数据丢失
### 2. 系统稳定性
- 多级兜底机制确保系统不会因为数据库问题而崩溃
- 详细的日志记录便于问题排查
### 3. 数据准确性
- 内容生成时使用最新的数据库数据
- 避免使用过期或不准确的缓存数据
### 4. 可观测性
- 匹配率监控
- 详细的日志记录
- 兜底数据标记
## 🚀 使用建议
### 1. 监控日志
关注以下日志信息:
- ID匹配率低于50%的警告
- 兜底数据使用情况
- 数据库查询失败的频率
### 2. 数据维护
- 定期清理重复数据
- 更新"请修改产品名字"等占位数据
- 确保软删除字段的正确使用
### 3. 性能优化
- 考虑为常用数据添加缓存
- 优化数据库查询性能
- 定期清理无效数据
## 📋 测试验证
### 1. 功能测试
- 测试各种ID组合的选题生成
- 验证名称匹配的准确性
- 确认兜底机制的有效性
### 2. 性能测试
- 大批量选题生成的性能
- 数据库查询的响应时间
- 内存使用情况
### 3. 错误处理测试
- 数据库连接失败时的行为
- 无效ID的处理
- 数据缺失时的兜底效果