Compare commits

..

No commits in common. "495b7525aeb5695ec99ee221d7a2d772ef68299c" and "fa5f4f5a3ccea14122eebf4966f00020c825e140" have entirely different histories.

19 changed files with 103 additions and 774 deletions

Binary file not shown.

View File

@ -59,9 +59,9 @@ app.add_middleware(
from api.routers import tweet, poster, prompt
# 包含路由
app.include_router(tweet.router, prefix="/api/v1/tweet", tags=["tweet"])
app.include_router(poster.router, prefix="/api/v1/poster", tags=["poster"])
app.include_router(prompt.router, prefix="/api/v1/prompt", tags=["prompt"])
app.include_router(tweet.router, prefix="/api/tweet", tags=["tweet"])
app.include_router(poster.router, prefix="/api/poster", tags=["poster"])
app.include_router(prompt.router, prefix="/api/prompt", tags=["prompt"])
@app.get("/")
async def root():

View File

@ -11,22 +11,18 @@ from pydantic import BaseModel, Field
class TopicRequest(BaseModel):
"""选题生成请求模型"""
dates: Optional[str] = Field(None, description="日期字符串,可能为单个日期、多个日期用逗号分隔或范围如'2023-01-01 to 2023-01-31'")
date: str = Field(..., description="选题日期格式为YYYY-MM-DD")
num_topics: int = Field(5, description="要生成的选题数量", ge=1, le=10)
styles: Optional[List[str]] = Field(None, description="风格列表")
audiences: Optional[List[str]] = Field(None, description="受众列表")
scenic_spots: Optional[List[str]] = Field(None, description="景区列表")
products: Optional[List[str]] = Field(None, description="产品列表")
style: Optional[str] = Field(None, description="内容风格,如'旅游攻略''亲子游'")
target_audience: Optional[str] = Field(None, description="目标受众,如'年轻人''家庭'")
class Config:
schema_extra = {
"example": {
"dates": "2023-07-01 to 2023-07-31",
"num_topics": 5,
"styles": ["旅游攻略", "亲子游"],
"audiences": ["年轻人", "家庭"],
"scenic_spots": ["故宫", "长城"],
"products": ["门票", "导游服务"]
"date": "2023-07-15",
"num_topics": 3,
"style": "旅游攻略",
"target_audience": "年轻人"
}
}
@ -57,29 +53,20 @@ class TopicResponse(BaseModel):
class ContentRequest(BaseModel):
"""内容生成请求模型"""
topic: Optional[Dict[str, Any]] = Field(None, description="选题信息")
styles: Optional[List[str]] = Field(None, description="风格列表")
audiences: Optional[List[str]] = Field(None, description="受众列表")
scenic_spots: Optional[List[str]] = Field(None, description="景区列表")
products: Optional[List[str]] = Field(None, description="产品列表")
auto_judge: bool = Field(False, description="是否自动进行内容审核")
topic: Dict[str, Any] = Field(..., description="选题信息")
class Config:
schema_extra = {
"example": {
"topic": {
"index": "1",
"date": "2024-07-01",
"style": "攻略风",
"target_audience": "亲子向",
"object": "天津冒险湾",
"product": "冒险湾-2大2小套票"
},
"styles": ["攻略风", "种草风"],
"audiences": ["亲子向", "情侣向"],
"scenic_spots": ["天津冒险湾", "北京故宫"],
"products": ["冒险湾-2大2小套票", "故宫门票"],
"auto_judge": True
"date": "2023-07-15",
"object": "北京故宫",
"product": "故宫门票",
"style": "旅游攻略",
"target_audience": "年轻人",
"logic": "暑期旅游热门景点推荐"
}
}
}
@ -106,12 +93,8 @@ class ContentResponse(BaseModel):
class JudgeRequest(BaseModel):
"""内容审核请求模型"""
topic: Optional[Dict[str, Any]] = Field(None, description="选题信息")
topic: Dict[str, Any] = Field(..., description="选题信息")
content: Dict[str, Any] = Field(..., description="要审核的内容")
styles: Optional[List[str]] = Field(None, description="风格列表")
audiences: Optional[List[str]] = Field(None, description="受众列表")
scenic_spots: Optional[List[str]] = Field(None, description="景区列表")
products: Optional[List[str]] = Field(None, description="产品列表")
class Config:
schema_extra = {
@ -129,11 +112,7 @@ class JudgeRequest(BaseModel):
"title": "【北京故宫】避开人潮的秘密路线90%的人都不知道!",
"content": "故宫,作为中国最著名的文化遗产之一...",
"tag": ["北京旅游", "故宫", "旅游攻略", "避暑胜地"]
},
"styles": ["旅游攻略"],
"audiences": ["年轻人"],
"scenic_spots": ["北京故宫"],
"products": ["故宫门票"]
}
}
}
@ -161,27 +140,21 @@ class JudgeResponse(BaseModel):
class PipelineRequest(BaseModel):
"""流水线请求模型"""
dates: Optional[str] = Field(None, description="日期范围,如:'2024-07-01 to 2024-07-31'")
num_topics: int = Field(5, description="要生成的选题数量")
styles: Optional[List[str]] = Field(None, description="风格列表")
audiences: Optional[List[str]] = Field(None, description="受众列表")
scenic_spots: Optional[List[str]] = Field(None, description="景区列表")
products: Optional[List[str]] = Field(None, description="产品列表")
"""完整流程请求模型"""
date: str = Field(..., description="选题日期格式为YYYY-MM-DD")
num_topics: int = Field(5, description="要生成的选题数量", ge=1, le=10)
style: Optional[str] = Field(None, description="内容风格,如'旅游攻略''亲子游'")
target_audience: Optional[str] = Field(None, description="目标受众,如'年轻人''家庭'")
skip_judge: bool = Field(False, description="是否跳过内容审核步骤")
auto_judge: bool = Field(False, description="是否在内容生成时进行内嵌审核")
class Config:
schema_extra = {
"example": {
"dates": "2024-07-01 to 2024-07-31",
"date": "2023-07-15",
"num_topics": 3,
"styles": ["攻略风", "种草风"],
"audiences": ["亲子向", "情侣向"],
"scenic_spots": ["天津冒险湾", "北京故宫"],
"products": ["冒险湾-2大2小套票", "故宫门票"],
"skip_judge": False,
"auto_judge": True
"style": "旅游攻略",
"target_audience": "年轻人",
"skip_judge": False
}
}

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API路由模块
"""

View File

@ -60,21 +60,17 @@ async def generate_topics(
"""
生成选题
- **dates**: 日期字符串可能为单个日期多个日期用逗号分隔或范围
- **date**: 选题日期格式为YYYY-MM-DD
- **num_topics**: 要生成的选题数量
- **styles**: 风格列表
- **audiences**: 受众列表
- **scenic_spots**: 景区列表
- **products**: 产品列表
- **style**: 内容风格'旅游攻略''亲子游'
- **target_audience**: 目标受众'年轻人''家庭'
"""
try:
request_id, topics = await tweet_service.generate_topics(
dates=request.dates,
date=request.date,
num_topics=request.num_topics,
styles=request.styles,
audiences=request.audiences,
scenic_spots=request.scenic_spots,
products=request.products
style=request.style,
target_audience=request.target_audience
)
return TopicResponse(
@ -92,23 +88,13 @@ async def generate_content(
tweet_service: TweetService = Depends(get_tweet_service)
):
"""
生成内容
为选题生成内容
- **topic**: 选题信息
- **styles**: 风格列表
- **audiences**: 受众列表
- **scenic_spots**: 景区列表
- **products**: 产品列表
- **auto_judge**: 是否自动进行内容审核
"""
try:
request_id, topic_index, content = await tweet_service.generate_content(
topic=request.topic,
styles=request.styles,
audiences=request.audiences,
scenic_spots=request.scenic_spots,
products=request.products,
auto_judge=request.auto_judge
topic=request.topic
)
return ContentResponse(
@ -160,19 +146,11 @@ async def judge_content(
- **topic**: 选题信息
- **content**: 要审核的内容
- **styles**: 风格列表
- **audiences**: 受众列表
- **scenic_spots**: 景区列表
- **products**: 产品列表
"""
try:
request_id, topic_index, judged_content, judge_success = await tweet_service.judge_content(
topic=request.topic,
content=request.content,
styles=request.styles,
audiences=request.audiences,
scenic_spots=request.scenic_spots,
products=request.products
content=request.content
)
return JudgeResponse(
@ -192,27 +170,21 @@ async def run_pipeline(
tweet_service: TweetService = Depends(get_tweet_service)
):
"""
运行完整流水线生成选题 生成内容 审核内容
运行完整流水线包括生成选题生成内容和审核内容
- **dates**: 日期范围
- **date**: 选题日期格式为YYYY-MM-DD
- **num_topics**: 要生成的选题数量
- **styles**: 风格列表
- **audiences**: 受众列表
- **scenic_spots**: 景区列表
- **products**: 产品列表
- **style**: 内容风格'旅游攻略''亲子游'
- **target_audience**: 目标受众'年轻人''家庭'
- **skip_judge**: 是否跳过内容审核步骤
- **auto_judge**: 是否在内容生成时进行内嵌审核
"""
try:
request_id, topics, contents, judged_contents = await tweet_service.run_pipeline(
dates=request.dates,
date=request.date,
num_topics=request.num_topics,
styles=request.styles,
audiences=request.audiences,
scenic_spots=request.scenic_spots,
products=request.products,
skip_judge=request.skip_judge,
auto_judge=request.auto_judge
style=request.style,
target_audience=request.target_audience,
skip_judge=request.skip_judge
)
return PipelineResponse(

View File

@ -7,7 +7,7 @@
"""
import logging
from typing import Dict, Any, Optional, Tuple, List
from typing import Dict, Any, Optional, Tuple
from pathlib import Path
from core.config import ConfigManager, GenerateContentConfig, GenerateTopicConfig, PosterConfig
@ -109,85 +109,6 @@ class PromptBuilderService:
return system_prompt, user_prompt
def build_content_prompt_with_params(self, topic: Dict[str, Any],
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None,
step: str = "content") -> Tuple[str, str]:
"""
使用额外参数构建内容生成提示词
Args:
topic: 选题信息
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
step: 当前步骤用于过滤参考内容
Returns:
系统提示词和用户提示词的元组
"""
# 获取内容生成配置
content_config = self._ensure_content_config()
# 加载系统提示词和用户提示词模板
system_prompt_path = content_config.content_system_prompt
user_prompt_path = content_config.content_user_prompt
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 获取风格内容
style_content = ''
if styles:
style_content = '\n'.join([f"{style}: {self.prompt_service.get_style_content(style)}" for style in styles])
else:
style_filename = topic.get("style", "")
style_content = f"{style_filename}\n{self.prompt_service.get_style_content(style_filename)}"
# 获取目标受众内容
demand_content = ''
if audiences:
demand_content = '\n'.join([f"{audience}: {self.prompt_service.get_audience_content(audience)}" for audience in audiences])
else:
demand_filename = topic.get("target_audience", "")
demand_content = f"{demand_filename}\n{self.prompt_service.get_audience_content(demand_filename)}"
# 获取景区信息
object_content = ''
if scenic_spots:
object_content = '\n'.join([f"{spot}: {self.prompt_service.get_scenic_spot_info(spot)}" for spot in scenic_spots])
else:
object_name = topic.get("object", "")
object_content = f"{object_name}\n{self.prompt_service.get_scenic_spot_info(object_name)}"
# 获取产品信息
product_content = ''
if products:
product_content = '\n'.join([f"{product}: {self.prompt_service.get_product_info(product)}" for product in products])
else:
product_name = topic.get("product", "")
product_content = f"{product_name}\n{self.prompt_service.get_product_info(product_name)}"
# 获取参考内容
refer_content = self.prompt_service.get_refer_content(step)
# 构建系统提示词
system_prompt = template.get_system_prompt()
# 构建用户提示词
user_prompt = template.build_user_prompt(
style_content=style_content,
demand_content=demand_content,
object_content=object_content,
product_content=product_content,
refer_content=refer_content
)
return system_prompt, user_prompt
def build_poster_prompt(self, topic: Dict[str, Any], content: Dict[str, Any]) -> Tuple[str, str]:
"""
构建海报生成提示词
@ -229,17 +150,13 @@ class PromptBuilderService:
return system_prompt, user_prompt
def build_topic_prompt(self, products: Optional[List[str]] = None, scenic_spots: Optional[List[str]] = None, styles: Optional[List[str]] = None, audiences: Optional[List[str]] = None, dates: Optional[str] = None, num_topics: int = 5) -> Tuple[str, str]:
def build_topic_prompt(self, num_topics: int, month: str) -> Tuple[str, str]:
"""
构建选题生成提示词
Args:
products: 产品列表
scenic_spots: 景区列表
styles: 风格列表
audiences: 受众列表
dates: 日期字符串可能为单个日期多个日期用逗号分隔或范围如'2023-01-01 to 2023-01-31'
num_topics: 要生成的选题数量
month: 月份
Returns:
系统提示词和用户提示词的元组
@ -256,51 +173,20 @@ class PromptBuilderService:
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 处理日期
if dates:
if ' to ' in dates:
start_date, end_date = dates.split(' to ')
month = f"{start_date}{end_date}"
elif ',' in dates:
month = ', '.join(dates.split(','))
else:
month = dates
else:
month = ''
# 获取风格列表
styles = self.prompt_service.get_all_styles()
style_content = "Style文件列表:\n" + "\n".join([f"- {style['name']}" for style in styles])
# 获取风格内容
style_content = ''
if styles:
style_content = '\n'.join([f"{style}: {self.prompt_service.get_style_content(style)}" for style in styles])
else:
all_styles = self.prompt_service.get_all_styles()
style_content = "Style文件列表:\n" + "\n".join([f"- {style['name']}" for style in all_styles])
# 获取受众内容
demand_content = ''
if audiences:
demand_content = '\n'.join([f"{audience}: {self.prompt_service.get_audience_content(audience)}" for audience in audiences])
else:
all_audiences = self.prompt_service.get_all_audiences()
demand_content = "Demand文件列表:\n" + "\n".join([f"- {audience['name']}" for audience in all_audiences])
# 获取目标受众列表
audiences = self.prompt_service.get_all_audiences()
demand_content = "Demand文件列表:\n" + "\n".join([f"- {audience['name']}" for audience in audiences])
# 获取参考内容
refer_content = self.prompt_service.get_refer_content("topic")
# 获取景区内容
object_content = ''
if scenic_spots:
object_content = '\n'.join([f"{spot}: {self.prompt_service.get_scenic_spot_info(spot)}" for spot in scenic_spots])
else:
all_spots = self.prompt_service.get_all_scenic_spots()
object_content = "Object信息:\n" + "\n".join([f"- {spot['name']}" for spot in all_spots])
# 获取产品内容
product_content = ''
if products:
product_content = '\n'.join([f"{product}: {self.prompt_service.get_product_info(product)}" for product in products])
else:
product_content = '' # 假设没有默认产品列表
# 获取景区信息列表
spots = self.prompt_service.get_all_scenic_spots()
object_content = "Object信息:\n" + "\n".join([f"- {spot['name']}" for spot in spots])
# 构建系统提示词
system_prompt = template.get_system_prompt()
@ -308,11 +194,10 @@ class PromptBuilderService:
# 构建创作资料
creative_materials = (
f"你拥有的创作资料如下:\n"
f"风格信息:\n{style_content}\n\n"
f"受众信息:\n{demand_content}\n\n"
f"参考内容:\n{refer_content}\n\n"
f"景区信息:\n{object_content}\n\n"
f"产品信息:\n{product_content}"
f"{style_content}\n\n"
f"{demand_content}\n\n"
f"{refer_content}\n\n"
f"{object_content}"
)
# 构建用户提示词
@ -371,115 +256,4 @@ class PromptBuilderService:
refer_content=refer_content
)
return system_prompt, user_prompt
def build_judge_prompt_with_params(self, topic: Dict[str, Any], content: Dict[str, Any],
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None) -> Tuple[str, str]:
"""
使用额外参数构建内容审核提示词
Args:
topic: 选题信息
content: 生成的内容
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
Returns:
系统提示词和用户提示词的元组
"""
# 获取内容生成配置
content_config = self._ensure_content_config()
# 从配置中获取审核提示词模板路径
system_prompt_path = content_config.judger_system_prompt
user_prompt_path = content_config.judger_user_prompt
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 获取景区信息
object_content = ''
if scenic_spots:
object_content = '\n'.join([f"{spot}: {self.prompt_service.get_scenic_spot_info(spot)}" for spot in scenic_spots])
else:
object_name = topic.get("object", "")
object_content = f"{object_name}\n{self.prompt_service.get_scenic_spot_info(object_name)}"
# 获取产品信息
product_content = ''
if products:
product_content = '\n'.join([f"{product}: {self.prompt_service.get_product_info(product)}" for product in products])
else:
product_name = topic.get("product", "")
product_content = f"{product_name}\n{self.prompt_service.get_product_info(product_name)}"
# 获取参考内容
refer_content = self.prompt_service.get_refer_content("judge")
# 构建系统提示词
system_prompt = template.get_system_prompt()
# 格式化内容
import json
tweet_content = json.dumps(content, ensure_ascii=False, indent=4)
# 构建用户提示词
user_prompt = template.build_user_prompt(
tweet_content=tweet_content,
object_content=object_content,
product_content=product_content,
refer_content=refer_content
)
return system_prompt, user_prompt
def build_judge_prompt_simple(self, topic: Dict[str, Any], content: Dict[str, Any]) -> Tuple[str, str]:
"""
构建简化的内容审核提示词只需要产品信息景区信息和文章
Args:
topic: 选题信息
content: 生成的内容
Returns:
系统提示词和用户提示词的元组
"""
# 获取内容生成配置
content_config = self._ensure_content_config()
# 从配置中获取审核提示词模板路径
system_prompt_path = content_config.judger_system_prompt
user_prompt_path = content_config.judger_user_prompt
# 创建提示词模板
template = PromptTemplate(system_prompt_path, user_prompt_path)
# 获取景区信息
object_name = topic.get("object", "")
object_content = self.prompt_service.get_scenic_spot_info(object_name)
# 获取产品信息
product_name = topic.get("product", "")
product_content = self.prompt_service.get_product_info(product_name)
# 构建系统提示词
system_prompt = template.get_system_prompt()
# 格式化内容
import json
tweet_content = json.dumps(content, ensure_ascii=False, indent=4)
# 构建用户提示词(简化版,不包含参考内容)
user_prompt = template.build_user_prompt(
tweet_content=tweet_content,
object_content=object_content,
product_content=product_content,
refer_content="" # 简化版不使用参考内容
)
return system_prompt, user_prompt

View File

@ -342,36 +342,13 @@ class PromptService:
full_path = self._get_full_path(path_str)
if full_path.exists() and full_path.is_file():
if full_path.suffix.lower() == '.json':
# 处理JSON文件
with open(full_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict) and 'examples' in data:
examples = data['examples']
if isinstance(examples, list):
sample_size = max(1, int(len(examples) * ref_item.sampling_rate))
sampled_examples = random.sample(examples, sample_size)
sampled_content = json.dumps({'examples': sampled_examples}, ensure_ascii=False, indent=4)
elif isinstance(data, list):
sample_size = max(1, int(len(data) * ref_item.sampling_rate))
sampled_examples = random.sample(data, sample_size)
sampled_content = json.dumps(sampled_examples, ensure_ascii=False, indent=4)
else:
# 如果不是预期结构,按原方式处理
with open(full_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
sample_size = max(1, int(len(lines) * ref_item.sampling_rate))
sampled_lines = random.sample(lines, sample_size)
sampled_content = ''.join(sampled_lines)
else:
# 非JSON文件按原方式处理
with open(full_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
with open(full_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
if lines:
sample_size = max(1, int(len(lines) * ref_item.sampling_rate))
sampled_lines = random.sample(lines, sample_size)
sampled_content = ''.join(sampled_lines)
refer_content += f"--- {full_path.name} (sampled {ref_item.sampling_rate * 100}%) ---\n{sampled_content}\n\n"
refer_content += f"--- {full_path.name} (sampled {ref_item.sampling_rate * 100}%) ---\n{sampled_content}\n\n"
except Exception as e:
logger.error(f"读取或采样参考文件失败 {ref_item.path}: {e}")
except Exception as e:

View File

@ -48,41 +48,32 @@ class TweetService:
self.prompt_service = PromptService(config_manager)
self.prompt_builder = PromptBuilderService(config_manager, self.prompt_service)
async def generate_topics(self, dates: Optional[str] = None, num_topics: int = 5,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None) -> Tuple[str, List[Dict[str, Any]]]:
async def generate_topics(self, date: str, num_topics: int = 5,
style: Optional[str] = None,
target_audience: Optional[str] = None) -> Tuple[str, List[Dict[str, Any]]]:
"""
生成选题
Args:
dates: 日期字符串可能为单个日期多个日期用逗号分隔或范围
date: 选题日期格式为YYYY-MM-DD
num_topics: 要生成的选题数量
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
style: 内容风格
target_audience: 目标受众
Returns:
请求ID和生成的选题列表
"""
logger.info(f"开始生成选题,日期: {dates}, 数量: {num_topics}")
logger.info(f"开始生成选题,日期: {date}, 数量: {num_topics}")
# 获取并更新配置
topic_config = self.config_manager.get_config('topic_gen', GenerateTopicConfig)
if dates:
topic_config.topic.date = dates
topic_config.topic.date = date
topic_config.topic.num = num_topics
# 使用PromptBuilderService构建提示词
system_prompt, user_prompt = self.prompt_builder.build_topic_prompt(
products=products,
scenic_spots=scenic_spots,
styles=styles,
audiences=audiences,
dates=dates,
num_topics=num_topics
num_topics=num_topics,
month=date
)
# 使用预构建的提示词生成选题
@ -97,68 +88,24 @@ class TweetService:
logger.info(f"选题生成完成请求ID: {request_id}, 数量: {len(topics)}")
return request_id, topics
async def generate_content(self, topic: Optional[Dict[str, Any]] = None,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None,
auto_judge: bool = False) -> Tuple[str, str, Dict[str, Any]]:
async def generate_content(self, topic: Dict[str, Any]) -> Tuple[str, str, Dict[str, Any]]:
"""
为选题生成内容
Args:
topic: 选题信息
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
auto_judge: 是否自动进行内容审核
Returns:
请求ID选题索引和生成的内容如果启用审核则返回审核后的内容
请求ID选题索引和生成的内容
"""
# 如果没有提供topic创建一个基础的topic
if not topic:
topic = {"index": "1", "date": "2024-07-01"}
topic_index = topic.get('index', 'unknown')
logger.info(f"开始为选题 {topic_index} 生成内容{'(含审核)' if auto_judge else ''}")
# 创建topic的副本并应用覆盖参数
enhanced_topic = topic.copy()
if styles and len(styles) > 0:
enhanced_topic['style'] = styles[0] # 使用第一个风格
if audiences and len(audiences) > 0:
enhanced_topic['target_audience'] = audiences[0] # 使用第一个受众
if scenic_spots and len(scenic_spots) > 0:
enhanced_topic['object'] = scenic_spots[0] # 使用第一个景区
if products and len(products) > 0:
enhanced_topic['product'] = products[0] # 使用第一个产品
logger.info(f"开始为选题 {topic_index} 生成内容")
# 使用PromptBuilderService构建提示词
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(enhanced_topic, "content")
system_prompt, user_prompt = self.prompt_builder.build_content_prompt(topic, "content")
# 使用预构建的提示词生成内容
content = await self.content_generator.generate_content_with_prompt(enhanced_topic, system_prompt, user_prompt)
# 如果启用自动审核,则进行内嵌审核
if auto_judge:
logger.info(f"开始对选题 {topic_index} 的内容进行内嵌审核")
try:
# 构建简化的审核提示词(只需要产品信息、景区信息和文章)
judge_system_prompt, judge_user_prompt = self.prompt_builder.build_judge_prompt_simple(enhanced_topic, content)
# 进行审核
judged_content = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, judge_system_prompt, judge_user_prompt)
if judged_content.get('judge_success', False):
logger.info(f"选题 {topic_index} 内容审核成功,使用审核后的内容")
content = judged_content
else:
logger.warning(f"选题 {topic_index} 内容审核失败,使用原始内容")
except Exception as e:
logger.error(f"选题 {topic_index} 内嵌审核失败: {e},使用原始内容")
content = await self.content_generator.generate_content_with_prompt(topic, system_prompt, user_prompt)
# 生成请求ID
request_id = f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}"
@ -190,52 +137,25 @@ class TweetService:
logger.info(f"内容生成完成请求ID: {request_id}, 选题索引: {topic_index}")
return request_id, topic_index, content
async def judge_content(self, topic: Optional[Dict[str, Any]] = None, content: Dict[str, Any] = {},
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None) -> Tuple[str, str, Dict[str, Any], bool]:
async def judge_content(self, topic: Dict[str, Any], content: Dict[str, Any]) -> Tuple[str, str, Dict[str, Any], bool]:
"""
审核内容
Args:
topic: 选题信息
content: 要审核的内容
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
Returns:
请求ID选题索引审核后的内容和审核是否成功
"""
# 如果没有提供topic创建一个基础的topic
if not topic:
topic = {"index": "1", "date": "2024-07-01"}
# 如果没有提供content返回错误
if not content:
content = {"title": "未提供内容", "content": "未提供内容"}
topic_index = topic.get('index', 'unknown')
logger.info(f"开始审核选题 {topic_index} 的内容")
# 创建topic的副本并应用覆盖参数
enhanced_topic = topic.copy()
if styles and len(styles) > 0:
enhanced_topic['style'] = styles[0] # 使用第一个风格
if audiences and len(audiences) > 0:
enhanced_topic['target_audience'] = audiences[0] # 使用第一个受众
if scenic_spots and len(scenic_spots) > 0:
enhanced_topic['object'] = scenic_spots[0] # 使用第一个景区
if products and len(products) > 0:
enhanced_topic['product'] = products[0] # 使用第一个产品
# 使用PromptBuilderService构建提示词
system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(enhanced_topic, content)
system_prompt, user_prompt = self.prompt_builder.build_judge_prompt(topic, content)
# 审核内容
judged_data = await self.content_judger.judge_content_with_prompt(content, enhanced_topic, system_prompt, user_prompt)
judged_data = await self.content_judger.judge_content_with_prompt(content, topic, system_prompt, user_prompt)
judge_success = judged_data.get('judge_success', False)
# 生成请求ID
@ -244,53 +164,47 @@ class TweetService:
logger.info(f"内容审核完成请求ID: {request_id}, 选题索引: {topic_index}, 审核结果: {judge_success}")
return request_id, topic_index, judged_data, judge_success
async def run_pipeline(self, dates: Optional[str] = None, num_topics: int = 5,
styles: Optional[List[str]] = None,
audiences: Optional[List[str]] = None,
scenic_spots: Optional[List[str]] = None,
products: Optional[List[str]] = None,
skip_judge: bool = False,
auto_judge: bool = False) -> Tuple[str, List[Dict[str, Any]], Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]:
async def run_pipeline(self, date: str, num_topics: int = 5,
style: Optional[str] = None,
target_audience: Optional[str] = None,
skip_judge: bool = False) -> Tuple[str, List[Dict[str, Any]], Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]:
"""
运行完整流水线
Args:
dates: 日期字符串可能为单个日期多个日期用逗号分隔或范围
date: 选题日期格式为YYYY-MM-DD
num_topics: 要生成的选题数量
styles: 风格列表
audiences: 受众列表
scenic_spots: 景区列表
products: 产品列表
skip_judge: 是否跳过内容审核步骤与auto_judge互斥
auto_judge: 是否在内容生成时进行内嵌审核
style: 内容风格
target_audience: 目标受众
skip_judge: 是否跳过内容审核步骤
Returns:
请求ID生成的选题列表生成的内容和审核后的内容
"""
logger.info(f"开始运行完整流水线,日期: {dates}, 数量: {num_topics}, 内嵌审核: {auto_judge}")
logger.info(f"开始运行完整流水线,日期: {date}, 数量: {num_topics}")
# 生成请求ID
request_id = f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{str(uuid.uuid4())[:8]}"
# 步骤1: 生成选题
_, topics = await self.generate_topics(dates, num_topics, styles, audiences, scenic_spots, products)
_, topics = await self.generate_topics(date, num_topics, style, target_audience)
if not topics:
logger.error("未能生成任何选题,流程终止")
return request_id, [], {}, {}
# 步骤2: 为每个选题生成内容(可选择内嵌审核)
# 步骤2: 为每个选题生成内容
contents = {}
for topic in topics:
topic_index = topic.get('index', 'unknown')
_, _, content = await self.generate_content(topic, auto_judge=auto_judge)
_, _, content = await self.generate_content(topic)
contents[topic_index] = content
# 如果使用内嵌审核或跳过审核,直接返回结果
if auto_judge or skip_judge:
logger.info(f"{'使用内嵌审核' if auto_judge else '跳过内容审核步骤'}流水线完成请求ID: {request_id}")
# 如果跳过审核,直接返回结果
if skip_judge:
logger.info(f"跳过内容审核步骤流水线完成请求ID: {request_id}")
return request_id, topics, contents, contents
# 步骤3: 独立审核内容(仅在未使用内嵌审核且未跳过审核时执行)
# 步骤3: 审核内容
judged_contents = {}
for topic_index, content in contents.items():
topic = next((t for t in topics if t.get('index') == topic_index), None)

View File

@ -128,7 +128,7 @@ class ConfigManager:
Args:
name: 配置名称
Returns:
原始配置数据字典
"""

View File

@ -1,281 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
统一配置管理器
"""
import json
import os
import logging
from pathlib import Path
from typing import Dict, Type, TypeVar, Optional, Any, cast, List, Set
from core.config.models import (
BaseConfig, AIModelConfig, SystemConfig, GenerateTopicConfig, ResourceConfig,
GenerateContentConfig, PosterConfig, ContentConfig
)
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseConfig)
class ConfigManager:
"""
统一配置管理器
负责加载、管理和访问所有配置
"""
# 服务端必要的全局配置
SERVER_CONFIGS = {'system', 'ai_model', 'database'}
# 单次生成任务的配置
TASK_CONFIGS = {'topic_gen', 'content_gen', 'poster_gen', 'resource'}
def __init__(self):
self._configs: Dict[str, BaseConfig] = {}
self._raw_configs: Dict[str, Dict[str, Any]] = {} # 存储原始配置数据
self.config_dir: Optional[Path] = None
self.config_objects = {
'ai_model': AIModelConfig(),
'system': SystemConfig(),
'resource': ResourceConfig()
}
self._loaded_configs: Set[str] = set()
def load_from_directory(self, config_dir: str, server_mode: bool = False):
"""
从目录加载配置
Args:
config_dir: 配置文件目录
server_mode: 是否为服务器模式,如果是则只加载必要的全局配置
"""
self.config_dir = Path(config_dir)
if not self.config_dir.is_dir():
logger.error(f"配置目录不存在: {config_dir}")
raise FileNotFoundError(f"配置目录不存在: {config_dir}")
# 注册所有已知的配置类型
self._register_configs()
# 动态加载目录中的所有.json文件
self._load_all_configs_from_dir(server_mode)
def _register_configs(self):
"""注册所有配置"""
self.register_config('ai_model', AIModelConfig)
self.register_config('system', SystemConfig)
self.register_config('resource', ResourceConfig)
# 这些配置在服务器模式下不会自动加载,但仍然需要注册类型
self.register_config('poster', PosterConfig)
self.register_config('content', ContentConfig)
self.register_config('topic_gen', GenerateTopicConfig)
self.register_config('content_gen', GenerateContentConfig)
def register_config(self, name: str, config_class: Type[T]) -> None:
"""
注册一个配置类
Args:
name: 配置名称
config_class: 配置类 (必须是 BaseConfig 的子类)
"""
if not issubclass(config_class, BaseConfig):
raise TypeError("config_class must be a subclass of BaseConfig")
if name not in self._configs:
self._configs[name] = config_class()
def get_config(self, name: str, config_class: Type[T]) -> T:
"""
获取配置实例
Args:
name: 配置名称
config_class: 配置类 (用于类型提示)
Returns:
配置实例
"""
config = self._configs.get(name)
if config is None:
# 如果配置不存在,先注册一个默认实例
self.register_config(name, config_class)
config = self._configs.get(name)
# 确保配置是正确的类型
if not isinstance(config, config_class):
# 尝试转换配置
try:
if isinstance(config, BaseConfig):
# 将现有配置转换为请求的类型
new_config = config_class(**config.model_dump())
self._configs[name] = new_config
config = new_config
else:
raise TypeError(f"Configuration '{name}' is not of type '{config_class.__name__}'")
except Exception as e:
logger.error(f"转换配置 '{name}' 到类型 '{config_class.__name__}' 失败: {e}")
raise TypeError(f"Configuration '{name}' is not of type '{config_class.__name__}'") from e
return cast(T, config)
def get_raw_config(self, name: str) -> Dict[str, Any]:
"""
获取原始配置数据
Args:
name: 配置名称
Returns:
原始配置数据字典
"""
if name in self._raw_configs:
return self._raw_configs[name]
# 如果没有原始配置,但有对象配置,则转换为字典
if name in self._configs:
return self._configs[name].to_dict()
# 尝试从文件加载
if self.config_dir:
config_path = self.config_dir / f"{name}.json"
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
raw_config = json.load(f)
self._raw_configs[name] = raw_config
return raw_config
except Exception as e:
logger.error(f"加载原始配置 '{name}' 失败: {e}")
# 返回空字典
return {}
def _load_all_configs_from_dir(self, server_mode: bool = False):
"""
动态加载目录中的所有.json文件
Args:
server_mode: 是否为服务器模式,如果是则只加载必要的全局配置
"""
try:
# 遍历并加载目录中所有其他的 .json 文件
for config_path in self.config_dir.glob('*.json'):
config_name = config_path.stem # 'topic_gen.json' -> 'topic_gen'
# 服务器模式下,只加载必要的全局配置
if server_mode and config_name not in self.SERVER_CONFIGS:
logger.info(f"服务器模式下跳过非全局配置: {config_name}")
continue
# 加载原始配置
with open(config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
self._raw_configs[config_name] = config_data
# 更新对象配置
if config_name in self._configs:
logger.info(f"加载配置文件 '{config_name}': {config_path}")
self._configs[config_name].update(config_data)
self._loaded_configs.add(config_name)
else:
logger.info(f"加载原始配置 '{config_name}': {config_path}")
# 最后应用环境变量覆盖
self._apply_env_overrides()
except Exception as e:
logger.error(f"从目录 '{self.config_dir}' 加载配置失败: {e}", exc_info=True)
raise
def load_task_config(self, config_name: str) -> bool:
"""
按需加载任务配置
Args:
config_name: 配置名称
Returns:
是否成功加载
"""
if config_name in self._loaded_configs:
return True
if self.config_dir:
config_path = self.config_dir / f"{config_name}.json"
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
self._raw_configs[config_name] = config_data
if config_name in self._configs:
self._configs[config_name].update(config_data)
self._loaded_configs.add(config_name)
logger.info(f"按需加载任务配置 '{config_name}': {config_path}")
return True
except Exception as e:
logger.error(f"加载任务配置 '{config_name}' 失败: {e}")
logger.warning(f"未找到任务配置: {config_name}")
return False
def _apply_env_overrides(self):
"""应用环境变量覆盖"""
logger.info("应用环境变量覆盖...")
# 示例: AI模型配置环境变量覆盖
ai_model_config = self.get_config('ai_model', AIModelConfig)
if not ai_model_config: return # 如果没有AI配置则跳过
env_mapping = {
'AI_MODEL': 'model',
'API_URL': 'api_url',
'API_KEY': 'api_key'
}
update_data = {}
for env_var, config_key in env_mapping.items():
if os.getenv(env_var):
update_data[config_key] = os.getenv(env_var)
if update_data:
ai_model_config.update(update_data)
# 更新原始配置
if 'ai_model' in self._raw_configs:
for key, value in update_data.items():
self._raw_configs['ai_model'][key] = value
logger.info(f"通过环境变量更新了AI模型配置: {list(update_data.keys())}")
def save_config(self, name: str):
"""
保存指定的配置到文件
Args:
name: 要保存的配置名称
"""
if not self.config_dir:
raise ValueError("配置目录未设置,无法保存文件")
path = self.config_dir / f"{name}.json"
config = self.get_config(name, BaseConfig)
config_data = config.to_dict()
# 更新原始配置
self._raw_configs[name] = config_data
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=4, ensure_ascii=False)
logger.info(f"配置 '{name}' 已保存到 {path}")
except Exception as e:
logger.error(f"保存配置 '{name}' 到 {path} 失败: {e}", exc_info=True)
raise
# 全局配置管理器实例
config_manager = ConfigManager()
def get_config_manager() -> ConfigManager:
return config_manager

View File

@ -130,7 +130,7 @@ class GenerateTopicConfig(BaseConfig):
class GenerateContentConfig(BaseConfig):
"""内容生成配置"""
content_system_prompt: str = "resource/prompt/generateContent/system.txt"
content_system_prompt: str = "resource/prompt/generateContent/contentSystem.txt"
content_user_prompt: str = "resource/prompt/generateContent/user.txt"
judger_system_prompt: str = "resource/prompt/judgeContent/system.txt"
judger_user_prompt: str = "resource/prompt/judgeContent/user.txt"