Telegram 搜索机器人BOT

司空实
2023-12-01

 一,小伙伴么,当你有的时候玩TG,你自己建一个群或者频道,有很多人的时候,会找不到聊天记录,或者文件,那么你就可以用这个机器人来帮你实现。

     1,通过关键词来检索你想要的内容。这个时候你就会想打造属于你的 telegramBot,直接发送你要搜索的关键字,就可以定位到你想要的内容,搜索支持 Lucene 语法。

     2, 机器人同时可以为群组、频道、个人提供聊天记录搜索服务。

     3, 工作原理是使用 Telegram Client Api 获取频道内所有信息,并持续监听新信息。将所有信息归档Elasticsearch 搜索引擎,用户可以在 Bot 前端执行搜索。

二、那么如何搭建呢?

  1. 前提条件

申请 Telegram MTProto API ID: https://my.telegram.org/app

申请 Telegram Bot ID:@BotFather

准备一个 Telegram 账号

安装 Python3:​​​​​​​Download Python | Python.org

下载源代码:

2安装依赖: pip install -r requirements.txt

修改 main.py 中的配置或使用环境变量

  • API_ID:Telegram MTProto API ID
  • API_HASH:Telegram MTProto API ID
  • BOT_TOKEN:从 BotFather 获取的 bot token
  • CHAT_ID:你要搜索的 chat 的 ID,可以使用 @getidsbot 获取。
  • ADMIN_ID:管理员的 ID,可以使用 @getidsbot 获取。
  • 先创建一个 session 文件夹(mkdir session),运行 python main.py 提示输入手机号和验证码即可,session文件夹里面会生成几个数据库文件。
  • 部署
  • 把 session 文件夹和源码部署到服务器。
  • 修改 docker-compose.yml 中的环境变量
  • 使用 docker-compose 部署:docker-compose up -d
  • 启动完成后用管理员的账号(之前配置的 ADMIN_ID)向 Bot 发送命令 /download_history 下载历史记录。
  • 源代码如下。
  • from telethon import TelegramClient, events, Button
    import socks
    import asyncio
    import html
    import os
    
    REDIS_HOST = "REDIS_HOST" in os.environ and os.environ["REDIS_HOST"] or 'localhost'
    REDIS_PORT = "REDIS_PORT" in os.environ and os.environ["REDIS_PORT"] or 6379
    ELASTIC_URL = "ELASTIC_URL" in os.environ and os.environ["ELASTIC_URL"] or 'http://localhost:9200/'
    API_ID = "API_ID" in os.environ and os.environ["API_ID"] or 123456
    API_HASH = "API_HASH" in os.environ and os.environ["API_HASH"] or 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    BOT_TOKEN = "BOT_TOKEN" in os.environ and os.environ["BOT_TOKEN"] or '123456789:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    CHAT_ID = "CHAT_ID" in os.environ and os.environ["CHAT_ID"] or '-1001254246410'
    ADMIN_ID = "ADMIN_ID" in os.environ and os.environ["ADMIN_ID"] or '345796292'
    
    from elasticsearch import Elasticsearch
    es = Elasticsearch([ELASTIC_URL])
    
    import redis
    db = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    
    # https://docs.telethon.dev/en/latest/basic/signing-in.html
    api_id = str(API_ID)
    api_hash = API_HASH
    bot_token = BOT_TOKEN
    
    # proxy = (socks.SOCKS5, '127.0.0.1', 7777)
    proxy = None
    
    chat_id = int(CHAT_ID)
    admin_id = int(ADMIN_ID)
    
    welcome_message = '''
    这里是 你 的搜索 Bot,直接发送你要搜索的内容即可。搜索支持 Lucene 语法。
    例如:
    `每日速览`
    `+每日速览 +date:2019-12-25`
    `+每日速览 +date:[2019-12-25 TO 2019-12-30]`
    '''
    
    share_id = chat_id < 0 and chat_id * -1 - 1000000000000 or chat_id
    elastic_index = "chat" + str(chat_id)
    
    mapping = {
    "properties":{
    "content": {
    "type": "text",
    "analyzer": "ik_max_word",
    "search_analyzer": "ik_smart"
    },
    "url": {
    "type": "text"
    },
    "date": {
    "type": "date"
    }
    }
    }
    
    def ensureElasticIndex(index, mapping):
    if not es.indices.exists(index=elastic_index):
    es.indices.create(index=elastic_index)
    es.indices.put_mapping(index=elastic_index, body=mapping)
    
    def deleteElasticIndex(index):
    if es.indices.exists(index=elastic_index):
    es.indices.delete(index=elastic_index)
    
    def search(q, from_, size=10):
    ensureElasticIndex(index=elastic_index, mapping=mapping)
    return es.search(index=elastic_index, q=q, df="content", size=10, from_=from_, body={
    "highlight" : {
    "pre_tags" : ["<b>"],
    "post_tags" : ["</b>"],
    "fields" : {
    "content" : {
    "fragment_size" : 15,
    "number_of_fragments" : 3,
    "fragmenter": "span"
    }
    }
    }
    })
    
    def renderRespondText(result, from_):
    total = result['hits']['total']['value']
    respond = '搜素到%d个结果:\n' % (total)
    for i in range(len(result['hits']['hits'])):
    hit = result['hits']['hits'][i]
    content = 'highlight' in hit and hit['highlight']['content'][0] or hit['_source']['content'][0:15]
    
    respond += '%d. <a href="%s">%s</a>\n' % (from_ + i + 1, hit['_source']['url'], content)
    respond += '耗时%.3f秒。' % (result['took'] / 1000)
    return respond
    
    def renderRespondButton(result, from_):
    total = result['hits']['total']['value']
    return [
    [
    Button.inline('上一页⬅️', str(max(from_ - 10, 0))),
    Button.inline('➡️下一页', str(min(from_ + 10, total // 10 * 10))),
    ]
    ]
    
    @events.register(events.NewMessage)
    async def ClientMessageHandler(event):
    if event.chat_id == chat_id and event.raw_text and len(event.raw_text.strip()) >= 0:
    es.index(index=elastic_index, body={"content": html.escape(event.raw_text).replace('\n',' '), "date": int(event.date.timestamp() * 1000), "url": "https://t.me/c/%s/%s" % (share_id, event.id)}, id=event.id)
    
    @events.register(events.CallbackQuery)
    async def BotCallbackHandler(event):
    if event.data:
    from_i = int(event.data)
    q = db.get('msg-' + str(event.message_id) + '-q')
    if q:
    result = search(q, from_i)
    respond = renderRespondText(result, from_i)
    buttons = renderRespondButton(result, from_i)
    msg = await event.edit(respond, parse_mode='html', buttons=buttons)
    
    await event.answer()
    
    async def downloadHistory():
    deleteElasticIndex(index=elastic_index)
    ensureElasticIndex(index=elastic_index, mapping=mapping)
    async for message in client.iter_messages(chat_id):
    if message.chat_id == chat_id and message.raw_text and len(message.raw_text.strip()) >= 0:
    print(message.id)
    es.index(
    index=elastic_index,
    body={"content": html.escape(message.raw_text).replace('\n',' '), "date": int(message.date.timestamp() * 1000), "url": "https://t.me/c/%s/%s" % (share_id, message.id)},
    id=message.id
    )
    
    @events.register(events.NewMessage)
    async def BotMessageHandler(event):
    if event.raw_text.startswith('/start'):
    await event.respond(welcome_message, parse_mode='markdown')
    elif event.raw_text.startswith('/download_history') and event.chat_id == admin_id:
    # 下载所有历史记录
    await event.respond('开始下载历史记录', parse_mode='markdown')
    await downloadHistory()
    await event.respond('下载完成', parse_mode='markdown')
    else:
    from_i = 0
    q = event.raw_text
    result = search(q, from_i)
    respond = renderRespondText(result, from_i)
    buttons = renderRespondButton(result, from_i)
    msg = await event.respond(respond, parse_mode='html', buttons=buttons)
    
    db.set('msg-' + str(msg.id) + '-q', q)
    
    loop = asyncio.get_event_loop()
    
    client = TelegramClient('session/client', api_id, api_hash, connection_retries=None, proxy=proxy, loop=loop)
    client.add_event_handler(ClientMessageHandler)
    client.start()
    
    bot = TelegramClient('session/bot', api_id, api_hash, connection_retries=None, proxy=proxy, loop=loop)
    bot.add_event_handler(BotMessageHandler)
    bot.add_event_handler(BotCallbackHandler)
    bot.start(bot_token=bot_token)
    
    try:
    loop.run_forever()
    except KeyboardInterrupt:
    pass
    
    

 类似资料: