本文使用 graia-scheduler
包进行开发.
实际上是python的异步功能, 但是我不会, 所以就找到了作者写好的包
pip install graia-broadcast
pip install graia-scheduler
import asyncio
from graia.broadcast import Broadcast
from graia.scheduler import GraiaScheduler
from graia.scheduler.timers import crontabify
loop = asyncio.get_event_loop()
bcc = Broadcast(loop=loop, debug_flag=True)
scheduler = GraiaScheduler(loop, bcc)
@scheduler.schedule(crontabify("* * * * * *"))
async def something_scheduled():
await printLog()
async def printLog():
print("print every seconds.")
loop.run_forever()
GraiaScheduler 是作者实现的一个类, 构造函数有两个参数, 一个是python asyncio的loop, 另一个是GraiaBroadcast.
注意广播(Boardcast)也需要传参数loop, 而且两个loop应该是同一个, 所以loop要先实例化.
异步函数标识, 不懂, 先放着, 学了以后再来, 反正这么用就行了
就调用异步函数前需要 await
一下.
然后这里定时任务都写成异步函数.
在def一个函数前用 @scheduler.schedule()
进行标注, 表明override scheduler.schedule()
函数.
override schedule 时需要传入 crontabify 以定时执行任务. crontabify 的参数解释如下:
五个空格分成了六段, 以此表示 分钟, 小时, 月, 日, 周, 秒
'30 12 1 1 * 0'
表示在一月一号12:30:00执行.\*
表示任意, 如 '* * * * * *'
表示在任意月日周小时分钟秒都执行, 也就是每秒执行一次.','
可以用来分割数字, 如 '0,30 12 1 1 * 0'
表示在一月一号12:00:00和12:30:00各执行一次.顾名思义. 要放在所有函数定义完以后用???
from graia.broadcast import Broadcast
from graia.application import *
from graia.application.message.chain import MessageChain
from graia.application.message.elements.internal import *
from graia.scheduler import GraiaScheduler
from graia.scheduler.timers import crontabify
from graia.scheduler import timers
import asyncio
import messages
GROUP_ID_NGCS = 1234567
message_remind = MessageChain.create([Plain('张炀杰要在竹园大厅表演 AK ICPC WF 吗qaq')])
loop = asyncio.get_event_loop() #异步循环
bcc = Broadcast(loop=loop)
app = GraiaMiraiApplication(
broadcast=bcc,
connect_info=Session(
host='http://localhost:8080', # 填入 httpapi 服务运行的地址
authKey='FUCKTENCENT', # 填入 authKey
account=123456, # 你的机器人的 qq 号
websocket=True # Graia 已经可以根据所配置的消息接收的方式来保证消息接收部分的正常运作.
)
)
# 定时任务
scheduler = GraiaScheduler(loop, bcc)
@scheduler.schedule(crontabify('30 19 * * 5 0'))
async def scheduledRemindBookRoom():
await messages.sendGroupsMessages(app=app, groups_id=[GROUP_ID_NGCS], messages=[message_remind])
app.launch_blocking()
loop.run_forever()
message.py
如下:
from graia.application import GraiaMiraiApplication
from graia.application.message.chain import MessageChain
from graia.application.group import Group
import asyncio
async def sendGroupsMessages(app, groups_id, messages):
groups = await app.groupList()
for group in groups:
if (group.id in groups_id):
for message in messages:
await app.sendGroupMessage(group, message)