当前位置: 首页 > 面试题库 >

如何从django-celery 3任务发送Channel 2.x组消息?

孟福
2023-03-14
问题内容

我需要推迟发送频道消息。这是我的代码:

# consumers.py
class ChatConsumer(WebsocketConsumer):
    def chat_message(self, event):
        self.send(text_data=json.dumps(event['message']))

    def connect(self):
        self.channel_layer.group_add(self.room_name, self.channel_name)
        self.accept()

    def receive(self, text_data=None, bytes_data=None):
        send_message_task.apply_async(
            args=(
                self.room_name,
                {'type': 'chat_message',
                 'message': 'the message'}
            ),
            countdown=10
        )

# tasks.py
@shared_task
def send_message_task(room_name, message):
    layer = get_channel_layer()
    layer.group_send(room_name, message)

该任务正在执行,我看不到任何错误,但消息未发送。仅当我从消费者类方法发送时才有效。

我也尝试使用AsyncWebsocketConsumer并使用AsyncToSync(layer.group_send)发送。错误为“您不能在与异步事件循环相同的线程中使用AsyncToSync-
请直接等待异步功能。”

然后,我尝试将send_message_task声明为异步并使用await。再也没有发生任何事情(没有错误),并且我不确定该任务是否已执行。

以下是版本:

Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1

设定:

REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
        },
    },
}

有任何想法吗?

UPD: 刚刚发现redis通道层已恢复,但是没有调用而是跳过了group_send方法

UPD 2:
使用AsyncToSync(layer.group_send)从控制台工作进行发送。不带调用任务apply_async也可以。但是,将其运行会apply_async导致错误You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly。将任务定义为异步并使用await当然也会破坏一切。


问题答案:

也许这不是开始问题的直接答案,但这可能会有所帮助。如果出现异常“您不能在与异步事件循环相同的线程中使用AsyncToSync-
请直接等待异步函数”,那么您可能会做出这样的选择:

  1. 事件循环在某处创建
  2. 一些ASYNC代码已启动
  3. 从ASYNC代码中调用了一些SYNC代码
  4. SYNC代码正在尝试使用AsyncToSync调用ASYNC代码,以防止出现这种情况

似乎AsyncToSync检测到外部事件循环并做出不干扰它的决定。

解决方案是将您的异步调用直接包含在外部事件循环中。下面是示例代码,但是最好是检查您的情况,并确定外部循环正在运行…

loop = asyncio.get_event_loop()
loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))


 类似资料:
  • 我已在中创建了我的应用程序并配置为云消息传递。当我从控制台发送通知时,设备会收到通知,但如果我尝试通过Rest API(使用PostMan)发送。然后通知不会到达设备,但响应显示为成功。 这是我的邮差请求 URI-https://fcm.googleapis.com/fcm/send 标题:内容类型:应用程序/json授权:密钥=MY_SERVER_KEY 正文:{“数据”:{“标题”:“火力基地

  • 问题内容: 如何从Java发送SMTP消息? 问题答案: 这是Gmail smtp的示例: 现在,仅当您希望将项目依赖关系降至最低时,才可以这样做,否则我可以热烈推荐使用apache中的类 http://commons.apache.org/email/ 问候 托雷·雅各布森

  • 问题内容: 我将实现类似于Facebook通知和此网站的内容(StackOverflow的通知会通知我们是否有人为我们的问题写评论/答案等)。请注意,用户将使用我的应用程序作为网站而不是移动应用程序。 我遇到以下获取结果的答案,但我需要推送结果而不是获取结果。 根据建议,我在实体类中创建了一个简单方法,并向其中添加了@PostPersist,但此方法不起作用,因此基于此答案,我添加了persist

  • 问题内容: 我完全陷入困境,因为我无法让群组消息与Channels 2一起使用!我已经遵循了所有可以找到的教程和文档,但是遗憾的是,我还没有发现问题所在。我现在想做的是有一个特定的URL,该URL在被访问时应该向名为“ events”的组广播一条简单的消息。 首先,这是我在Django中采用的相关设置和当前设置: 接下来,这是我的EventConsumer,它以非常基本的方式扩展了JsonWebs

  • 在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制

  • 我正在尝试发送添加了标题的JSON格式。csv格式到前端进行下载。在发送HTTP响应时,我遇到了一个“不是JSON可序列化的”错误。 我的 文件: 打印响应cmd正在打印: 但是,最后一行会抛出如下错误: