海外访问:www.kdjingpai.com
Ctrl + D 收藏本站
当前位置:首页 » AI实操教程

AI 驱动的内容创作:在规范与创意之间寻找平衡

2025-06-29 26

对于许多内容创作者而言,将原始、枯燥的 RSS 新闻源转化为一篇结构清晰、观点独到且风格一致的专栏文章,是一项耗时费力的工作。信息洪流每日涌来,但真正将其打磨成吸引读者的内容,往往需要在“机械搬运”与“创意枯竭”之间挣扎。

本文将探讨一种自动化工具的构建思路,它能将这一过程变得高效而智能。这个工具的核心目的很明确:将 RSS 订阅源中未经加工的信息,通过大型语言模型(LLM)的处理,重塑为适合发布的高质量文章。

控制创意的“温度”

RSS 订阅源就像一个忠实的信使,它准确地传递信息,但缺乏解读和情感。而一个优秀的自动化工具,则应扮演专栏作家的角色,不仅呈现事实,更能赋予其观点和生命力。

实现这一点的关键在于对 AI 模型创造性的精确控制。通过调整 temperature 参数,用户可以驾驭模型的输出风格。这是一个范围通常在 0 到 1 之间的值,用于调节模型预测的随机性。较低的 temperature 值(如 0.2)会使输出更具确定性和一致性,适合需要严谨、事实驱动的报道;而较高的值(如 0.8)则会激发模型的“想象力”,产生更多样化、更具创意的表达方式,适合需要生动比喻和通俗解释的科普内容。

“三位一体”的写作助手

为了满足不同场景的需求,该工具内置了三种截然不同的“写作人格”,每一种都代表着严谨与创意的不同平衡点:

  • 日常简报人格: 模拟科技博主的亲切口吻,将每日的 AI 新闻串联成轻松的简报。它在确保核心信息准确的同时,会使用生活化的比喻和适当的 emoji,拉近与读者的距离。
  • 深度分析人格: 切换至行业分析师的视角,深入剖析新闻背后的技术本质与市场影响。它倾向于更低的 temperature,以保证输出的专业性与逻辑性,同时在框架内形成独到见解。
  • 小白翻译人格: 它的核心任务是化繁为简。例如,当遇到“Mixture of Experts (MoE)”这样的专业术语时,它不会直接抛给读者,而是将其比喻为“一个专家团队协同工作,每个专家只负责自己最擅长的一小部分问题,以此提升整体效率和准确性”,让零基础的读者也能轻松理解。

这三种人格确保了输出内容在风格上可以灵活多变,但在文章结构上保持一致,帮助读者形成稳定的阅读预期。

稳健运行的技术基石

一个真正可用的工具,不能仅仅是 API 的简单调用,其背后需要一套稳健的工程设计来确保流程的顺畅与可靠。

  • 自动重试机制: 网络波动或 API 临时性故障是常见问题。通过引入指数退避的重试机制,系统可以在遭遇失败后自动等待一段时间再重新尝试,极大提升了任务成功率。
  • 缓存系统: 对于更新频率不高的 RSS 源,反复抓取相同内容是一种资源浪费。一个简单的缓存系统可以暂存已获取的数据,在设定的时间内直接从本地读取,有效降低了网络请求和处理时间。
  • 流式输出 (Streaming Output): 无需等待模型生成全部内容,流式输出可以像视频加载一样,实时地将生成的文字逐字或逐句显示出来。这不仅让用户能即时预览内容质量,也方便在发现方向偏离时及时中止任务。
  • 灵活配置: 为了适应不同环境,工具支持通过命令行参数、环境变量和配置文件等多种方式进行设置,用户无需修改源代码即可调整模型、API 密钥或输出目录等。
  • 周全的异常处理: 从容应对各种意外,如 API 密钥缺失、RSS 地址无法访问等,保证系统在任何情况下都能清晰地报告错误,而不是直接崩溃。

实践中的效果

设想一下,当 Google 发布了名为 Gemini 2.5 Pro 并带有 DeepThink 推理功能的新模型时,经过“小白翻译人格”的处理,它可能会被解释成:

Gemini 2.5 Pro 就像一个极其聪明的 AI 伙伴,可以帮你处理各种任务。而新增的 DeepThink 模式,则赋予了它深度思考的能力。这意味着它不再仅仅回答“是什么”,更能清晰地解释“为什么是这样”,像一个能带你探究问题本质的良师益友。

这种处理方式,既保留了技术的专业性,又实现了表达的通俗化。它证明了创意并非天马行空的想象,而是在明确规则框架下的有序创新,如同爵士乐的即兴演奏,看似自由奔放,实则每一步都未脱离和声的轨道。

当内容创作者不再需要在“技术深度”与“大众科普”之间做艰难抉择时,技术的价值才真正得以彰显。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import feedparser
import datetime
import requests
import os
import json
import time
import logging
import traceback
import argparse
import configparser
from pathlib import Path
from datetime import timedelta
from functools import wraps
from typing import List, Dict, Any, Optional, Callable, TypeVar, Union
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("ai_news_generator.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("ai_news_generator")
# 类型声明
T = TypeVar('T')
FeedEntry = Dict[str, Any]
ApiResponse = Dict[str, Any]
# 默认配置
DEFAULT_CONFIG = {
"api": {
"base_url": "https://xxxxxxxxxxxxx/openai",
"model": "xxxxxxxxx",
"api_key": os.environ.get("OPENAI_API_KEY", ""),
"max_tokens": 2000,
"timeout": 60,
"temperature": 0.7
},
"rss": {
"url": "https://news.smol.ai/rss.xml",
"days": 7,
"cache_time": 3600  # 缓存RSS内容的时间(秒)
},
"output": {
"directory": "ai_news_output",
"format": "markdown"
}
}
# 重试装饰器
def retry(max_attempts: int = 3, delay: int = 2, backoff: int = 2,
exceptions: tuple = (Exception,)) -> Callable:
"""
重试装饰器,用于处理可能失败的操作
参数:
max_attempts: 最大尝试次数
delay: 初始延迟时间(秒)
backoff: 延迟的倍数(指数退避)
exceptions: 要捕获的异常元组
"""
def decorator(func):
        @wraps(func)
def wrapper(*args, **kwargs):
attempt = 0
current_delay = delay
while attempt < max_attempts:
try:
return func(*args, **kwargs)
except exceptions as e:
attempt += 1
if attempt == max_attempts:
logger.error(f"最大尝试次数已用完 ({max_attempts}),操作失败: {e}")
raise
logger.warning(f"尝试 {attempt}/{max_attempts} 失败: {e}. "
f"将在 {current_delay} 秒后重试...")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
class Config:
"""配置管理类"""
def __init__(self, config_file: Optional[str] = None):
"""
初始化配置
参数:
config_file: 配置文件路径,如果不存在则使用默认配置
"""
self.config = DEFAULT_CONFIG.copy()
if config_file and os.path.exists(config_file):
self._load_from_file(config_file)
else:
logger.info("未找到配置文件,使用默认配置")
# 环境变量优先级高于配置文件
if os.environ.get("OPENAI_API_KEY"):
self.config["api"]["api_key"] = os.environ.get("OPENAI_API_KEY")
def _load_from_file(self, config_file: str) -> None:
"""从文件加载配置"""
try:
parser = configparser.ConfigParser()
parser.read(config_file)
# 将配置文件值更新到默认配置
for section in parser.sections():
if section in self.config:
for key, value in parser.items(section):
# 尝试转换类型以匹配默认配置
if key in self.config[section]:
original_type = type(self.config[section][key])
if original_type is int:
self.config[section][key] = int(value)
elif original_type is float:
self.config[section][key] = float(value)
elif original_type is bool:
self.config[section][key] = value.lower() in ("true", "yes", "1")
else:
self.config[section][key] = value
logger.info(f"从 {config_file} 加载配置")
except Exception as e:
logger.error(f"加载配置文件出错: {e}")
def save_config(self, file_path: str) -> None:
"""将当前配置保存到文件"""
try:
parser = configparser.ConfigParser()
for section, options in self.config.items():
parser.add_section(section)
for key, value in options.items():
parser.set(section, key, str(value))
with open(file_path, 'w') as f:
parser.write(f)
logger.info(f"配置已保存到 {file_path}")
except Exception as e:
logger.error(f"保存配置文件出错: {e}")
def get(self, section: str, key: str, default: Any = None) -> Any:
"""获取配置值,如果不存在则返回默认值"""
try:
return self.config[section][key]
except KeyError:
logger.warning(f"配置 {section}.{key} 不存在,使用默认值: {default}")
return default
class RssReader:
"""RSS订阅内容读取类"""
def __init__(self, config: Config):
"""
初始化RSS读取器
参数:
config: 配置对象
"""
self.config = config
self.cache = {}
self.cache_time = {}
    @retry(max_attempts=3, exceptions=(requests.RequestException,))
def fetch_rss_feed(self, url: Optional[str] = None) -> Optional[bytes]:
"""
获取RSS订阅内容
参数:
url: RSS订阅地址,如果为None则使用配置中的地址
返回:
RSS内容或None(如果获取失败)
"""
url = url or self.config.get("rss", "url")
cache_time = self.config.get("rss", "cache_time")
# 检查缓存
if url in self.cache and url in self.cache_time:
if time.time() - self.cache_time[url] < cache_time:
logger.debug(f"使用缓存的RSS内容: {url}")
return self.cache[url]
try:
logger.info(f"获取RSS订阅: {url}")
response = requests.get(url, timeout=10)
response.raise_for_status()
# 更新缓存
self.cache[url] = response.content
self.cache_time[url] = time.time()
return response.content
except requests.exceptions.RequestException as e:
logger.error(f"获取RSS失败: {e}")
raise
def parse_feed(self, content: Optional[bytes]) -> Optional[feedparser.FeedParserDict]:
"""
解析RSS内容
参数:
content: RSS内容
返回:
解析后的Feed对象或None(如果解析失败)
"""
if not content:
logger.error("没有内容可以解析")
return None
try:
return feedparser.parse(content)
except Exception as e:
logger.error(f"解析RSS内容失败: {e}")
return None
def get_recent_entries(self, feed: Optional[feedparser.FeedParserDict],
days: Optional[int] = None) -> List[FeedEntry]:
"""
获取最近n天的订阅内容
参数:
feed: 解析后的Feed对象
days: 天数,如果为None则使用配置中的值
返回:
最近的条目列表
"""
if not feed:
logger.warning("没有Feed可以获取条目")
return []
days = days or self.config.get("rss", "days")
now = datetime.datetime.now()
cutoff_date = now - timedelta(days=days)
recent_entries = []
logger.info(f"获取最近 {days} 天的订阅内容")
for entry in feed.entries:
# 解析发布日期
pub_date = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
pub_date = datetime.datetime(*entry.published_parsed[:6])
elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
pub_date = datetime.datetime(*entry.updated_parsed[:6])
else:
# 如果没有日期信息,跳过该条目
logger.debug(f"跳过没有日期信息的条目: {entry.get('title', 'Unknown')}")
continue
# 只保留最近n天的内容
if pub_date >= cutoff_date:
recent_entries.append({
'title': entry.title,
'link': entry.link,
'published': pub_date.strftime('%Y-%m-%d %H:%M:%S'),
'summary': entry.summary if hasattr(entry, 'summary') else "无摘要",
})
logger.info(f"找到 {len(recent_entries)} 篇最近的文章")
return recent_entries
def display_entries(self, entries: List[FeedEntry]) -> None:
"""
显示条目内容
参数:
entries: 条目列表
"""
if not entries:
logger.info("没有找到最近的文章")
print("没有找到最近的文章")
return
print(f"找到 {len(entries)} 篇最近的文章:")
print("-" * 80)
for i, entry in enumerate(entries, 1):
print(f"{i}. {entry['title']}")
print(f"   发布时间: {entry['published']}")
print(f"   链接: {entry['link']}")
print(f"   摘要: {entry['summary'][:200]}...")  # 只显示部分摘要
print("-" * 80)
class ContentGenerator:
"""内容生成类"""
def __init__(self, config: Config):
"""
初始化内容生成器
参数:
config: 配置对象
"""
self.config = config
self._load_prompt_templates()
def _load_prompt_templates(self) -> None:
"""加载提示词模板"""
self.prompt_templates = {
"daily": """
你是国内顶尖的AI科技公众号编辑,擅长将复杂技术新闻转化为通俗易懂的内容。
请将提供的AI技术新闻整理成一篇微信公众号"每日AI简报",遵循以下要求:
【内容要求】
1. 使用标题"【每日AI简报】YYYY年MM月DD日",自动替换为当前日期
2. 开头用2-3句话总结今日AI领域的整体趋势或亮点
3. 为每条新闻设计简短醒目的小标题,形式为"【关键词】+核心内容"
4. 每条新闻包含:
- 事件概述(用最简单的话解释发生了什么)
- 为什么重要(对普通用户或行业的影响)
- 相关背景(如必要,2-3句话解释关键技术概念)
【表达风格】
1. 像"科技博主"而非"新闻记者"的语气,亲切自然
2. 使用生动的类比和比喻解释技术概念
3. 适当使用emoji增强表达(每段1-2个,不要过多)
4. 避免专业术语堆砌,必须使用时提供简明解释
5. 用"你"直接对读者说话,增强亲近感
【格式规范】
1. 通篇采用markdown格式
2. 每条新闻之间用分隔线或明显标题区分
3. 重点信息可用加粗、斜体强调
4. 总篇幅控制在1000-1500字之间
5. 结尾添加"感谢阅读,明天见~"和订阅引导
记住:写作目标是让"对AI感兴趣但没有技术背景的普通用户"轻松理解这些技术进展的价值和意义。
""",
"deep": """
你是一位资深AI领域分析师,擅长深入剖析技术进展和市场影响。
请将提供的AI技术新闻整理成一篇微信公众号"AI技术深度解析",遵循以下要求:
【内容架构】
1. 开篇:用简明语言概述本期新闻焦点,指出共同趋势或主题
2. 分析框架:将新闻按技术类别或应用领域分组(如LLM进展、多模态、AI应用等)
3. 每则新闻包含:
- 技术本质解析(这项技术/产品的核心机制是什么)
- 进步点评估(与现有技术相比有何突破)
- 行业影响分析(将如何改变相关行业格局)
- 技术路线判断(代表了什么发展方向)
【深度化处理】
1. 剖析核心技术原理,但使用通俗类比让非专业人士理解
2. 关联行业背景和商业模式,解释为何重要
3. 适当引入相关技术发展历史和竞争格局
4. 对技术发展方向做出有见地的推测
【表达规范】
1. 保持客观专业的分析语气,但避免学术化晦涩表达
2. 使用结构化段落和子标题保证清晰度
3. 复杂概念用图示类比或拆解方式解释
4. 适当引用数据或趋势支持分析
最终成文应当让读者不仅了解"发生了什么",更理解"为什么重要"及"未来走向",体现你的专业洞察。
""",
"beginner": """
你是一位极擅长技术科普的AI科技博主,你的超能力是把最前沿的AI技术解释得让初中生都能理解。
请将提供的AI技术新闻整理成一篇面向完全零基础读者的微信公众号"AI新手村日报",遵循以下要求:
【零门槛原则】
1. 假设读者从未接触过AI/ML相关概念,需要从零开始解释
2. 每个技术术语第一次出现时必须立即用括号给出"小白解释"
3. 使用日常生活中的具体例子和类比解释每个概念
4. 把复杂的技术进展转化为"这对你的生活意味着什么"
【内容结构】
1. 开场白:友好问候并用一句话概括"今天AI界发生了什么有趣的事"
2. 新闻主体:每条新闻使用"你知道吗?"或"想象一下"等引导式开头
3. 每则新闻拆解为:
- 这是什么?(用最简单的类比解释)
- 为什么很酷?(用日常场景展示应用)
- 小贴士:提供1-2个延伸知识点,但保持简单
【表达特色】
1. 使用轻松愉快的对话式语气,仿佛朋友间聊天
2. 丰富使用emoji表情和生动比喻
3. 适当加入幽默元素,让技术内容变得有趣
4. 使用"想象一下..."、"就好比..."等引导式表达
5. 问答形式展开解释,预设读者可能的疑问并回答
【视觉辅助】
1. 建议在正文中穿插使用简单示意图的位置标记
2. 关键概念用粗体标记
3. 使用项目符号和短段落提高可读性
记住:如果一个10岁孩子都能听懂你的解释,那你就成功了!
"""
}
def _prepare_input_content(self, entries: List[FeedEntry]) -> str:
"""
准备输入内容
参数:
entries: 条目列表
返回:
格式化的输入内容
"""
input_content = "以下是最近的AI技术新闻动态,请帮我整理成适合微信公众号的每日AI News推送:\n\n"
for entry in entries:
input_content += f"标题: {entry['title']}\n"
input_content += f"时间: {entry['published']}\n"
input_content += f"链接: {entry['link']}\n"
input_content += f"摘要: {entry['summary']}\n\n"
return input_content
    @retry(max_attempts=3, delay=5, exceptions=(requests.RequestException,))
def _call_api(self, messages: List[Dict[str, str]], stream: bool = False) -> Union[str, requests.Response]:
"""
调用API
参数:
messages: 消息列表
stream: 是否流式输出
返回:
生成的内容或流式响应对象
"""
base_url = self.config.get("api", "base_url")
model = self.config.get("api", "model")
api_key = self.config.get("api", "api_key")
timeout = self.config.get("api", "timeout")
max_tokens = self.config.get("api", "max_tokens")
temperature = self.config.get("api", "temperature")
if not api_key:
raise ValueError("API密钥不能为空")
# 构建请求数据
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": stream
}
# 设置请求头
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
# API 端点
endpoint = f"{base_url}/chat/completions"
logger.info(f"调用 API: {endpoint}, 流式输出: {stream}")
if stream:
response = requests.post(
endpoint,
headers=headers,
json=payload,
stream=True,
timeout=timeout
)
else:
response = requests.post(
endpoint,
headers=headers,
json=payload,
timeout=timeout
)
# 检查响应状态
if response.status_code != 200:
error_msg = f"API 请求失败: 状态码 {response.status_code}, 错误信息: {response.text}"
logger.error(error_msg)
raise requests.RequestException(error_msg)
if stream:
return response
else:
result = response.json()
if "choices" not in result or not result["choices"]:
raise ValueError("API响应格式错误,找不到内容")
return result["choices"][0]["message"]["content"]
def format_with_openai(self, entries: List[FeedEntry], style: str = "daily",
stream: bool = False) -> Optional[str]:
"""
使用OpenAI API对内容进行格式化与润色
参数:
entries: RSS条目列表
style: 输出风格,可选 'daily'(日常简报), 'deep'(深度解析), 'beginner'(小白友好)
stream: 是否使用流式输出
返回:
格式化后的内容或None(如果失败)
"""
if not entries:
logger.warning("没有找到最近的文章可以润色")
return "没有找到最近的文章可以润色"
try:
# 准备输入内容
input_content = self._prepare_input_content(entries)
# 选择对应风格的提示词
system_prompt = self.prompt_templates.get(style, self.prompt_templates["daily"])
# 构建消息
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": input_content}
]
# 调用API
if stream:
# 流式处理
print("正在生成内容,请稍候...")
response = self._call_api(messages, stream=True)
# 处理流式响应
formatted_content = []
client = None
try:
# 先尝试导入sseclient,失败则使用自定义解析
import sseclient
client = sseclient.SSEClient(response)
for event in client.events():
if event.data != "[DONE]":
try:
chunk = json.loads(event.data)
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
print(content, end="")
formatted_content.append(content)
except Exception as e:
logger.warning(f"解析事件失败: {e}")
except ImportError:
# 手动解析SSE
logger.info("未安装sseclient,使用自定义SSE解析")
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
print(content, end="")
formatted_content.append(content)
except Exception as e:
logger.warning(f"解析事件失败: {e}")
formatted_content = "".join(formatted_content)
print("\n\n生成完成!")
else:
# 非流式处理
formatted_content = self._call_api(messages, stream=False)
# 添加元数据
metadata = {
"source": self.config.get("rss", "url"),
"processed_date": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"article_count": len(entries),
"style": style
}
# 添加元数据到内容顶部作为YAML前置元数据
yaml_metadata = "---\n"
for key, value in metadata.items():
yaml_metadata += f"{key}: {value}\n"
yaml_metadata += "---\n\n"
return yaml_metadata + formatted_content
except Exception as e:
logger.error(f"内容生成失败: {e}")
logger.debug(traceback.format_exc())
return None
def save_to_file(self, content: Optional[str], filename: Optional[str] = None) -> Optional[str]:
"""
将内容保存到文件
参数:
content: 要保存的内容
filename: 文件名,如果为None则使用当前日期生成
返回:
保存的文件路径或None(如果失败)
"""
if not content:
logger.warning("没有内容可保存")
return None
# 确保输出目录存在
output_dir = self.config.get("output", "directory")
os.makedirs(output_dir, exist_ok=True)
# 如果未指定文件名,使用当前日期
if not filename:
today = datetime.datetime.now().strftime("%Y-%m-%d")
filename = f"ai_news_{today}.md"
# 确保文件路径
file_path = os.path.join(output_dir, filename)
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"内容已保存到 {file_path}")
return file_path
except Exception as e:
logger.error(f"保存文件失败: {e}")
return None
def parse_arguments() -> argparse.Namespace:
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="AI News生成器")
parser.add_argument("--config", "-c", type=str, help="配置文件路径")
parser.add_argument("--rss", "-r", type=str, help="RSS订阅地址")
parser.add_argument("--days", "-d", type=int, help="获取最近几天的内容")
parser.add_argument("--style", "-s", type=str, choices=["daily", "deep", "beginner"],
default="daily", help="生成内容的风格")
parser.add_argument("--stream", action="store_true", help="使用流式输出")
parser.add_argument("--output", "-o", type=str, help="输出文件路径")
parser.add_argument("--verbose", "-v", action="store_true", help="显示详细日志")
return parser.parse_args()
def main() -> None:
"""主函数"""
# 解析命令行参数
args = parse_arguments()
# 设置日志级别
if args.verbose:
logger.setLevel(logging.DEBUG)
# 加载配置
config = Config(args.config)
# 如果命令行参数提供了值,则覆盖配置
if args.rss:
config.config["rss"]["url"] = args.rss
if args.days:
config.config["rss"]["days"] = args.days
# 初始化RSS读取器和内容生成器
rss_reader = RssReader(config)
content_generator = ContentGenerator(config)
# 获取RSS内容
rss_url = config.get("rss", "url")
days = config.get("rss", "days")
logger.info(f"开始处理,获取 {rss_url} 最近 {days} 天的内容...")
print(f"正在获取 {rss_url} 最近 {days} 天的内容...")
try:
# 获取并解析RSS
content = rss_reader.fetch_rss_feed()
feed = rss_reader.parse_feed(content)
if not feed:
logger.error("无法解析RSS内容")
print("无法解析RSS内容")
return
# 获取最近的条目
recent_entries = rss_reader.get_recent_entries(feed, days)
rss_reader.display_entries(recent_entries)
if not recent_entries:
logger.warning("没有找到最近的文章")
return
# 如果命令行没有指定风格和流式输出,则交互式询问
style = args.style
stream = args.stream
if not args.style and not sys.argv[1:]:  # 如果没有提供任何命令行参数
print("\n选择内容润色风格:")
print("1. 日常简报风格 (默认,适合一般读者)")
print("2. 深度分析风格 (包含更多技术和市场分析)")
print("3. 小白友好风格 (零基础读者也能轻松理解)")
style_choice = input("请选择 (1-3,默认1): ").strip() or "1"
style_options = {
"1": "daily",
"2": "deep",
"3": "beginner"
}
style = style_options.get(style_choice, "daily")
print("\n是否使用流式输出? (实时显示生成过程)")
print("1. 是 - 实时显示生成过程")
print("2. 否 - 等待完整生成后显示")
stream_choice = input("请选择 (1-2,默认2): ").strip() or "2"
stream = stream_choice == "1"
# 使用OpenAI生成内容
logger.info(f"使用OpenAI进行内容润色 (风格: {style}, 流式输出: {stream})")
print(f"\n正在使用OpenAI进行内容润色 (风格: {style})...")
formatted_content = content_generator.format_with_openai(
recent_entries, style=style, stream=stream
)
if formatted_content:
if not stream:  # 只有非流式处理才需要显示预览
print("\n润色后内容预览 (前500字):")
print("-" * 80)
print(formatted_content[:500] + "...(更多内容已保存到文件)")
print("-" * 80)
# 确定输出文件名
output_file = args.output
if not output_file:
today = datetime.datetime.now().strftime("%Y-%m-%d")
output_file = f"ai_news_{style}_{today}.md"
# 保存到文件
saved_file = content_generator.save_to_file(formatted_content, output_file)
if saved_file:
print(f"完整内容已保存到 {saved_file}")
else:
logger.error("内容生成失败")
print("内容生成失败,请查看日志获取详细信息")
except Exception as e:
logger.error(f"处理过程中出错: {e}")
logger.debug(traceback.format_exc())
print(f"处理过程中出错: {e}")
print("请查看日志获取详细信息")
if __name__ == "__main__":
try:
import sys
main()
except KeyboardInterrupt:
logger.info("用户中断执行")
print("\n程序已中断")
except Exception as e:
logger.critical(f"未捕获的异常: {e}")
logger.debug(traceback.format_exc())
print(f"程序遇到错误: {e}")
sys.exit(1)

相关推荐

找不到AI工具?在这试试!

输入关键词,即可 无障碍访问 必应 搜索,快速找到本站所有 AI 工具。

邮箱

联系我们

回顶部

zh_CN简体中文