286 lines
15 KiB
Python

import os
import time
from datetime import datetime
import argparse
import sys
import traceback
import json
import logging
from core.ai_agent import AI_Agent
# from core.topic_parser import TopicParser # No longer needed directly in main?
import core.contentGen as contentGen
import core.posterGen as posterGen
import core.simple_collage as simple_collage
from utils.resource_loader import ResourceLoader
from utils.tweet_generator import ( # Import the moved functions
run_topic_generation_pipeline,
generate_content_for_topic,
generate_posters_for_topic
)
from utils.prompt_manager import PromptManager # Import PromptManager
import random
def load_config(config_path="poster_gen_config.json"):
"""Loads configuration from a JSON file."""
if not os.path.exists(config_path):
print(f"Error: Configuration file '{config_path}' not found.")
print("Please copy 'example_config.json' to 'poster_gen_config.json' and customize it.")
sys.exit(1)
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Basic validation (can be expanded)
required_keys = ["api_url", "model", "api_key", "resource_dir", "prompts_dir", "output_dir", "num", "variants", "topic_system_prompt", "topic_user_prompt", "content_system_prompt", "image_base_dir"]
if not all(key in config for key in required_keys):
missing_keys = [key for key in required_keys if key not in config]
print(f"Error: Config file '{config_path}' is missing required keys: {missing_keys}")
sys.exit(1)
# Resolve relative paths based on config location or a defined base path if necessary
# For simplicity, assuming paths in config are relative to project root or absolute
return config
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from '{config_path}'. Check the file format.")
sys.exit(1)
except Exception as e:
print(f"Error loading configuration from '{config_path}': {e}")
sys.exit(1)
# Removed generate_topics_step function definition from here
# Its logic is now in utils.tweet_generator.run_topic_generation_pipeline
# --- Main Orchestration Step (Remains in main.py) ---
def generate_content_and_posters_step(config, run_id, tweet_topic_record):
"""
Step 2: Generates content and posters for each topic in the record.
Returns True if successful (at least partially), False otherwise.
"""
if not tweet_topic_record or not tweet_topic_record.topics_list:
# print("Skipping content/poster generation: No valid topics found in the record.")
logging.warning("Skipping content/poster generation: No valid topics found in the record.")
return False
# print(f"\n--- Starting Step 2: Content and Poster Generation for run_id: {run_id} ---")
logging.info(f"Starting Step 2: Content and Poster Generation for run_id: {run_id}")
# print(f"Processing {len(tweet_topic_record.topics_list)} topics...")
logging.info(f"Processing {len(tweet_topic_record.topics_list)} topics...")
success_flag = False
prompt_manager = PromptManager(config)
ai_agent = None
try:
# --- Initialize AI Agent for Content Generation ---
# print("Initializing AI Agent for content generation...")
request_timeout = config.get("request_timeout", 30) # Get timeout from config
max_retries = config.get("max_retries", 3) # Get max_retries from config
ai_agent = AI_Agent(
config["api_url"],
config["model"],
config["api_key"],
timeout=request_timeout,
max_retries=max_retries
)
logging.info("AI Agent for content generation initialized.")
# --- Iterate through Topics ---
for i, topic_item in enumerate(tweet_topic_record.topics_list):
topic_index = topic_item.get('index', i + 1) # Use parsed index if available
# print(f"\nProcessing Topic {topic_index}/{len(tweet_topic_record.topics_list)}: {topic_item.get('object', 'N/A')}")
logging.info(f"--- Processing Topic {topic_index}/{len(tweet_topic_record.topics_list)}: {topic_item.get('object', 'N/A')} ---") # Make it stand out
# --- Generate Content Variants ---
tweet_content_list = generate_content_for_topic(
ai_agent, prompt_manager, config, topic_item, config["output_dir"], run_id, topic_index
)
if tweet_content_list:
# print(f" Content generation successful for Topic {topic_index}.")
logging.info(f"Content generation successful for Topic {topic_index}.")
# --- Generate Posters ---
posters_attempted = generate_posters_for_topic(
config, topic_item, tweet_content_list, config["output_dir"], run_id, topic_index
)
if posters_attempted: # Log even if no posters were *successfully* created, but the attempt was made
# print(f" Poster generation process completed for Topic {topic_index}.")
logging.info(f"Poster generation process completed for Topic {topic_index}.")
success_flag = True # Mark success if content and poster attempts were made
else:
# print(f" Poster generation skipped or failed early for Topic {topic_index}.")
logging.warning(f"Poster generation skipped or failed early for Topic {topic_index}.")
else:
# print(f" Content generation failed or yielded no valid results for Topic {topic_index}. Skipping posters.")
logging.warning(f"Content generation failed or yielded no valid results for Topic {topic_index}. Skipping posters.")
# print(f"--- Finished Topic {topic_index} ---")
logging.info(f"--- Finished Topic {topic_index} ---")
except KeyError as e:
# print(f"\nError: Configuration error during content/poster generation: Missing key '{e}'")
logging.error(f"Configuration error during content/poster generation: Missing key '{e}'")
traceback.print_exc()
return False # Indicate failure due to config error
except Exception as e:
# print(f"\nAn unexpected error occurred during content and poster generation: {e}")
# traceback.print_exc()
logging.exception("An unexpected error occurred during content and poster generation:")
return False # Indicate general failure
finally:
# Ensure the AI agent is closed
if ai_agent:
# print("Closing content generation AI Agent...")
logging.info("Closing content generation AI Agent...")
ai_agent.close()
if success_flag:
# print("\n--- Step 2: Content and Poster Generation completed (at least partially). ---")
logging.info("Step 2: Content and Poster Generation completed (at least partially).")
else:
# print("\n--- Step 2: Content and Poster Generation completed, but no content/posters were successfully generated or attempted. ---")
logging.warning("Step 2: Content and Poster Generation completed, but no content/posters were successfully generated or attempted.")
return success_flag # Return True if at least some steps were attempted
def main():
# --- Basic Logging Setup ---
logging.basicConfig(
level=logging.INFO, # Default level
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# --- End Logging Setup ---
parser = argparse.ArgumentParser(description="Travel Content Creator Pipeline")
parser.add_argument(
"--config",
type=str,
default="poster_gen_config.json",
help="Path to the configuration file (e.g., poster_gen_config.json)"
)
parser.add_argument(
"--run_id",
type=str,
default=None, # Default to None, let the pipeline generate it
help="Optional specific run ID (e.g., 'my_test_run_01'). If not provided, a timestamp-based ID will be generated."
)
parser.add_argument(
"--topics_file",
type=str,
default=None,
help="Optional path to a pre-generated topics JSON file. If provided, skips topic generation (Step 1)."
)
parser.add_argument(
"--debug",
action='store_true', # Add debug flag
help="Enable debug level logging."
)
args = parser.parse_args()
# --- Adjust Logging Level if Debug ---
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.info("Debug logging enabled.")
# --- End Debug Level Adjustment ---
# print("Starting Travel Content Creator Pipeline...")
logging.info("Starting Travel Content Creator Pipeline...")
# print(f"Using configuration file: {args.config}")
logging.info(f"Using configuration file: {args.config}")
# if args.run_id: print(f"Using specific run_id: {args.run_id}")
# if args.topics_file: print(f"Using existing topics file: {args.topics_file}")
if args.run_id: logging.info(f"Using specific run_id: {args.run_id}")
if args.topics_file: logging.info(f"Using existing topics file: {args.topics_file}")
config = load_config(args.config)
if config is None:
# print("Critical: Failed to load configuration. Exiting.")
logging.critical("Failed to load configuration. Exiting.")
sys.exit(1)
run_id = args.run_id
tweet_topic_record = None
pipeline_start_time = time.time()
# --- Step 1: Topic Generation (or Load Existing) ---
if args.topics_file:
# print(f"Skipping Topic Generation (Step 1) - Loading topics from: {args.topics_file}")
logging.info(f"Skipping Topic Generation (Step 1) - Loading topics from: {args.topics_file}")
topics_list = TopicParser.load_topics_from_json(args.topics_file)
if topics_list:
# Try to infer run_id from filename if not provided
if not run_id:
try:
base = os.path.basename(args.topics_file)
# Assuming format "tweet_topic_{run_id}.json" or "tweet_topic.json"
if base.startswith("tweet_topic_") and base.endswith(".json"):
run_id = base[len("tweet_topic_"): -len(".json")]
# print(f"Inferred run_id from topics filename: {run_id}")
logging.info(f"Inferred run_id from topics filename: {run_id}")
elif base == "tweet_topic.json": # Handle the default name?
# Decide how to handle this - maybe use parent dir name or generate new?
# For now, let's generate a new one if run_id is still None below
logging.warning(f"Loaded topics from default filename '{base}'. Run ID not inferred.")
else:
# print(f"Warning: Could not infer run_id from topics filename: {base}")
logging.warning(f"Could not infer run_id from topics filename: {base}")
except Exception as e:
# print(f"Warning: Error trying to infer run_id from topics filename: {e}")
logging.warning(f"Error trying to infer run_id from topics filename: {e}")
# If run_id is still None after trying to infer, generate one
if run_id is None:
run_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_loaded")
# print(f"Generated run_id for loaded topics: {run_id}")
logging.info(f"Generated run_id for loaded topics: {run_id}")
# Create a minimal tweetTopicRecord (prompts might be missing)
# We need the output_dir from config to make the record useful
output_dir = config.get("output_dir", "result") # Default to result if not in config
tweet_topic_record = tweetTopicRecord(topics_list, "", "", output_dir, run_id)
# print(f"Successfully loaded {len(topics_list)} topics for run_id: {run_id}")
logging.info(f"Successfully loaded {len(topics_list)} topics for run_id: {run_id}")
else:
# print(f"Error: Failed to load topics from {args.topics_file}. Cannot proceed.")
logging.error(f"Failed to load topics from {args.topics_file}. Cannot proceed.")
sys.exit(1)
else:
# print("--- Executing Topic Generation (Step 1) ---")
logging.info("Executing Topic Generation (Step 1)...")
step1_start = time.time()
run_id, tweet_topic_record = run_topic_generation_pipeline(config, args.run_id) # Pass run_id if provided
step1_end = time.time()
if run_id and tweet_topic_record:
# print(f"Step 1 completed successfully in {step1_end - step1_start:.2f} seconds. Run ID: {run_id}")
logging.info(f"Step 1 completed successfully in {step1_end - step1_start:.2f} seconds. Run ID: {run_id}")
else:
# print("Critical: Topic Generation (Step 1) failed. Exiting.")
logging.critical("Topic Generation (Step 1) failed. Exiting.")
sys.exit(1)
# --- Step 2: Content & Poster Generation ---
if run_id and tweet_topic_record:
# print("\n--- Executing Content and Poster Generation (Step 2) ---")
logging.info("Executing Content and Poster Generation (Step 2)...")
step2_start = time.time()
success = generate_content_and_posters_step(config, run_id, tweet_topic_record)
step2_end = time.time()
if success:
# print(f"Step 2 completed in {step2_end - step2_start:.2f} seconds.")
logging.info(f"Step 2 completed in {step2_end - step2_start:.2f} seconds.")
else:
# print("Warning: Step 2 finished, but may have encountered errors or generated no output.")
logging.warning("Step 2 finished, but may have encountered errors or generated no output.")
else:
# print("Error: Cannot proceed to Step 2: Invalid run_id or tweet_topic_record from Step 1.")
logging.error("Cannot proceed to Step 2: Invalid run_id or tweet_topic_record from Step 1.")
pipeline_end_time = time.time()
# print(f"\nPipeline finished. Total execution time: {pipeline_end_time - pipeline_start_time:.2f} seconds.")
logging.info(f"Pipeline finished. Total execution time: {pipeline_end_time - pipeline_start_time:.2f} seconds.")
# print(f"Results for run_id '{run_id}' are in: {os.path.join(config.get('output_dir', 'result'), run_id)}")
logging.info(f"Results for run_id '{run_id}' are in: {os.path.join(config.get('output_dir', 'result'), run_id)}")
if __name__ == "__main__":
main()