当前位置: 首页 > 面试题库 >

Flask-SocketIO Redis订阅

齐典
2023-03-14
问题内容

我正在使用https://github.com/miguelgrinberg/Flask-
SocketIO
来实现WebSocket服务器。

我需要从另一个进程(仅订阅)接收消息,并为特定房间中的客户端发出消息。

但是,当我尝试发送消息时,出现此错误:

无法将消息发送到家庭会议室:在请求上下文之外工作。

这是我的代码:

from flask import Flask, request
from flask_socketio import SocketIO, join_room, leave_room, send, rooms
import json
import eventlet
import logging
import redis
import threading

FORMAT = '%(asctime)-15s - %(message)s'
logging.basicConfig(format=FORMAT)
log = logging.getLogger(__name__)

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode='eventlet')

.
.
.

def _send_task_message():
    try:
        send(json.dumps({"type":"UPDATE_TASK"}), room='home')
    except Exception as e:
        log.error('Could not send message to home room: %s' % str(e))

class Listener(threading.Thread):
    def __init__(self, r, channels):
        threading.Thread.__init__(self)
        self.daemon = True
        self.redis = r
        self.pubsub = self.redis.pubsub()
        self.pubsub.psubscribe(channels)

    def work(self, item):
        if isinstance(item['data'], bytes):
            try:
                msg = item['data'].decode('utf-8')
                decode_msg = json.loads(msg)                
                if decode_msg['type'] == 'UPDATE_TASK':
                    _send_task_message()
            except ValueError as e:
                log.error("Error decoding msg to microservice: %s", str(e))

    def run(self):
        for item in self.pubsub.listen():
            self.work(item)


if __name__ == '__main__':

    r = redis.Redis()
    client = Listener(r, ['/bobguarana/socketio'])
    client.start()

    socketio.run(debug=True, app=app, port=8080)

问题答案:

我解决了将应用程序作为参数传递给类并按照错误描述的建议使用它的上下文,但是名称空间也是必需的:

class Listener(threading.Thread):
    def __init__(self, r, channels, app):
    threading.Thread.__init__(self)
    self.daemon = True
    self.redis = r
    self.pubsub = self.redis.pubsub()
    self.pubsub.psubscribe(channels)
    self.app = app

    def work(self, item):
        with app.app_context():
            if isinstance(item['data'], bytes):
                try:
                    msg = item['data'].decode('utf-8')
                    decode_msg = json.loads(msg)                
                    if decode_msg['type'] == 'UPDATE_TASK':
                        send(json.dumps({"type":"UPDATE_TASK"}), room='home', namespace='/')
                    #_send_task_message()
                except ValueError as e:
                    log.error("Error decoding msg to microservice: %s", str(e))

    def run(self):
        for item in self.pubsub.listen():
            self.work(item)

if __name__ == '__main__':

    r = redis.Redis()
    client = Listener(r, ['/bobguarana/socketio'], app)
    client.start()

    socketio.run(debug=True, app=app, port=8080)


 类似资料:
  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  • 订阅指过滤表(table)的规则,Canal 客户端发送给客户端订阅规则,那么服务端将会推送符合规则的表数据过来,采用正则匹配。 允许所有表:.\*\\\\..\*

  • 问题内容: 在官方的快速入门中,建议在使用单个 模块 时使用: 2. …如果您使用的是单个模块(如本例所示),则应使用,因为取决于它是作为应用程序启动还是作为模块导入,其名称将有所不同(与实际导入名称不同)。… 但是,在他们的API文档中,当我的应用程序为 软件包 时,建议进行硬编码: 因此,您在此处提供的内容很重要。如果使用单个模块,则始终为正确的值。但是,如果您使用的是包,通常建议在其中硬编码

  • 在前面,我们介绍了 REST Web 服务,并使用 Flask 提供服务。这里,我们使用第三方库 Flask-RESTful,它使得在 Flask 中提供 REST 服务变得更加简单。 安装 使用 pip 安装: $ pip install flask-restful 使用 下面我们主要使用官方文档的例子进行说明。 Hello World 我们先来看一个简单的例子。 # -*- coding: u

  • Bootstrap 是 Twitter 开源的一个 CSS/HTML 框架,它让 Web 开发变得更加迅速,简单。要想在我们的 Flask 应用中使用 Boostrap,有两种方案可供选择: 第 1 种,在我们的 Jinja 模板中直接引入 Bootstrap 层叠样式表 (CSS) 和 JavaScript 文件,比如 bootstrap.min.css,bootstrap.min.js; 第

  • 在 Web 应用中,我们经常需要保护我们的 api,以避免非法访问。比如,只允许登录成功的用户发表评论等。Flask-HTTPAuth 扩展可以很好地对 HTTP 的请求进行认证,不依赖于 Cookie 和 Session。本文主要介绍两种认证的方式:基于密码和基于令牌 (token)。 安装 使用 pip 安装: $ pip install Flask-HTTPAuth 基于密码的认证 为了简化