我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。
我从redislabs开始学习本教程
守则:
RedisClient redisClient = RedisClient.create("redis://pw@host:port");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
try {
syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
} catch (RedisBusyException redisBusyException) {
System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
}
System.out.println("Waiting for new messages ");
while (true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
if (!messages.isEmpty()) {
System.out.println(messages.size()); //
for (StreamMessage<String, String> message : messages) {
System.out.println(message.getId());
Thread.sleep(5000);
syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
}
}
}
我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正在等待,而一个消费者同时处理10条消息。
提前谢谢!
请注意,XREADGROUP可以获取COUNT
参数。
通过传递XReadArgs,在xreadgroup
xreadgroup中查看JavaDoc如何执行此操作。
在Apache Kafka 0.8.2 office文档的第5.6节“分销、消费者和消费者群体”小节中,它说 组中的使用者尽可能公平地划分分区,每个分区仅由一个消费组中的一个使用者使用。 但是我发现,在实践中,一个消费者组中的多个消费者可以通过从同一主题分区发送 FetchRequest 来使用单个分区中的数据。 在接下来的消费者身份证登记处小节中 除了由一个组中的所有使用者共享的group_id
问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在
在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制
由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。
我有两个消费群体,即G1和G2。 null 类似地,当G2轮询后,它仍然会找到关于主题的消息。这里还枯萎M3或M4会收到消息吗? 我也相信所有的成员都应该在同一个节点上。对吧?客户端代码或Kafka的责任是选择一个组中的特定成员吗?