这是利用api添加的任务列表,不会添加到客户端的定时发送消息那边,但是实现功能是一样的。
当然滥用也可能存在封号风险。只是时不时的有人问,需要定时发送消息的bot,就用cursor写了一个简单的,有需要的自行上车。
功能:
/add <群组id> <时间间隔> <条目> <消息内容>
如添加
群组1,添加每5分钟发送消息,50条,消息内容1
群组2,添加每10分钟发送消息,80条,消息内容2
/cancel <群组id>
如
取消群组1,设置的定时消息。
/list
查看所有添加的任务列表
依赖
pip install telethon
.env
API_ID=1xxx
API_HASH=fxxx
BOT_TOKEN=78xxx
main.py
from telethon import TelegramClient, events
from datetime import datetime, timedelta
import asyncio
import re
from typing import Dict, List, Tuple
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# Telegram API 配置
API_ID = os.getenv('API_ID')
API_HASH = os.getenv('API_HASH')
BOT_TOKEN = os.getenv('BOT_TOKEN')
# 存储定时任务的字典
# 格式: {group_id: (interval_minutes, message_count, message_content, task)}
scheduled_tasks: Dict[int, Tuple[int, int, str, asyncio.Task]] = {}
# 初始化客户端
client = TelegramClient('bot_session', API_ID, API_HASH)
async def schedule_messages(group_id: int, interval: int, count: int, message: str):
"""定时发送消息的协程"""
try:
# 先获取实体信息
entity = await client.get_entity(group_id)
except Exception as e:
print(f"获取实体信息失败: {e}")
return
sent_count = 0
while sent_count < count and group_id in scheduled_tasks:
try:
await client.send_message(entity, message)
sent_count += 1
print(f"已发送消息到 {group_id},第 {sent_count}/{count} 条")
await asyncio.sleep(interval * 60) # 转换为秒
except Exception as e:
print(f"发送消息时出错: {e}")
break
@client.on(events.NewMessage(pattern='/add'))
async def add_scheduled_messages(event):
"""添加定时消息任务"""
try:
# 解析命令参数: /add <群组id> <时间间隔> <条目> <消息内容>
params = event.text.split(maxsplit=4)
if len(params) != 5:
await event.respond("格式错误!请使用: /add <群组id> <时间间隔(分钟)> <消息数量> <消息内容>")
return
group_id = int(params[1])
interval = int(params[2])
count = int(params[3])
message = params[4]
# 验证群组是否存在
try:
entity = await client.get_entity(group_id)
except ValueError:
await event.respond("无法找到指定的群组/频道/用户,请检查ID是否正确")
return
# 如果该群组已有任务,先取消
if group_id in scheduled_tasks:
scheduled_tasks[group_id][3].cancel()
del scheduled_tasks[group_id]
# 创建新任务
task = asyncio.create_task(schedule_messages(group_id, interval, count, message))
scheduled_tasks[group_id] = (interval, count, message, task)
await event.respond(f"成功添加定时消息任务:\n"
f"目标: {entity.title if hasattr(entity, 'title') else entity.first_name}\n"
f"ID: {group_id}\n"
f"间隔时间: {interval}分钟\n"
f"消息数量: {count}\n"
f"消息内容: {message}")
except ValueError as e:
await event.respond(f"参数格式错误!请确保群组ID、时间间隔和消息数量都是数字。\n错误信息: {str(e)}")
except Exception as e:
await event.respond(f"添加任务时出错: {str(e)}")
@client.on(events.NewMessage(pattern='/cancel'))
async def cancel_scheduled_messages(event):
"""取消定时消息任务"""
try:
params = event.text.split()
if len(params) != 2:
await event.respond("格式错误!请使用: /cancel <群组id>")
return
group_id = int(params[1])
if group_id in scheduled_tasks:
scheduled_tasks[group_id][3].cancel()
del scheduled_tasks[group_id]
await event.respond(f"已取消群组 {group_id} 的定时消息任务")
else:
await event.respond(f"群组 {group_id} 没有正在运行的定时消息任务")
except ValueError:
await event.respond("群组ID必须是数字!")
except Exception as e:
await event.respond(f"取消任务时出错: {str(e)}")
@client.on(events.NewMessage(pattern='/list'))
async def list_scheduled_messages(event):
"""列出所有定时消息任务"""
if not scheduled_tasks:
await event.respond("当前没有正在运行的定时消息任务")
return
response = "当前运行的定时消息任务:\n\n"
for group_id, (interval, count, message, _) in scheduled_tasks.items():
try:
entity = await client.get_entity(group_id)
name = entity.title if hasattr(entity, 'title') else entity.first_name
response += f"目标: {name}\n"
except:
response += f"目标ID: {group_id}\n"
response += f"间隔时间: {interval}分钟\n"
response += f"消息数量: {count}\n"
response += f"消息内容: {message}\n"
response += "-" * 30 + "\n"
await event.respond(response)
@client.on(events.NewMessage(pattern='/help'))
async def show_help(event):
"""显示帮助信息"""
help_text = """
📝 定时消息机器人使用帮助:
1️⃣ 添加定时消息任务:
/add <群组id> <时间间隔> <条目> <消息内容>
例如:/add 123456789 5 50 这是测试消息
2️⃣ 取消定时消息任务:
/cancel <群组id>
例如:/cancel 123456789
3️⃣ 查看所有任务:
/list
4️⃣ 显示帮助信息:
/help
注意事项:
- 群组ID可以通过转发消息到 @raw_data_bot 获取
- 确保机器人是群组/频道的成员
- 对于私聊,需要用户先与机器人对话
"""
await event.respond(help_text)
async def start():
"""启动客户端和机器人"""
# 先以用户身份登录
await client.start()
print("客户端已登录")
# 再以机器人身份登录
await client.start(bot_token=BOT_TOKEN)
print("机器人已登录")
# 运行直到断开连接
await client.run_until_disconnected()
# 运行机器人
def main():
"""主函数"""
print("正在启动...")
loop = asyncio.get_event_loop()
loop.run_until_complete(start())
if __name__ == '__main__':
main()
上一篇:
将 telegram 作为 S3 存储,兼容Amazon S3 API下一篇:
一个开源家族谱展示网页项目