当前位置: 首页 > 知识库问答 >
问题:

Redis使用Java为每个消费者发送一条消息

金皓君
2023-03-14

我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。

我从redislabs开始学习本教程

守则:

    RedisClient redisClient = RedisClient.create("redis://pw@host:port");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisCommands<String, String> syncCommands = connection.sync();

    try {
        syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
    } catch (RedisBusyException redisBusyException) {
        System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
    }

    System.out.println("Waiting for new messages ");

    while (true) {
        List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
                Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

        if (!messages.isEmpty()) {
            System.out.println(messages.size()); // 
            for (StreamMessage<String, String> message : messages) {
                System.out.println(message.getId());
                Thread.sleep(5000);
                syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
            }
        }

    }

我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正在等待,而一个消费者同时处理10条消息。

提前谢谢!

共有1个答案

吕衡
2023-03-14

请注意,XREADGROUP可以获取COUNT参数

通过传递XReadArgs,在xreadgroupxreadgroup中查看JavaDoc如何执行此操作。

 类似资料:
  • 在Apache Kafka 0.8.2 office文档的第5.6节“分销、消费者和消费者群体”小节中,它说 组中的使用者尽可能公平地划分分区,每个分区仅由一个消费组中的一个使用者使用。 但是我发现,在实践中,一个消费者组中的多个消费者可以通过从同一主题分区发送 FetchRequest 来使用单个分区中的数据。 在接下来的消费者身份证登记处小节中 除了由一个组中的所有使用者共享的group_id

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我有两个消费群体,即G1和G2。 null 类似地,当G2轮询后,它仍然会找到关于主题的消息。这里还枯萎M3或M4会收到消息吗? 我也相信所有的成员都应该在同一个节点上。对吧?客户端代码或Kafka的责任是选择一个组中的特定成员吗?