我将Kafka2.9.2-0.8.1.1与zookeeper 3.4.6一起使用。 是否有一个实用程序可以自动从Zookeeper中删除一个消费者组?或者我可以删除zookeeper中/consumers/[group_id]下的所有内容吗?如果是后者,我还缺少什么吗&这可以用一个动态系统来完成吗? 更新:从kafka 2.3.0版本开始,有一个新的实用程序: 相关文档:http://kafka.
我刚刚开始使用动物园管理员在卡拉夫的DOSGi。我在Karaf的一个实例中提供服务,在另一个实例中提供消费者。服务端运行良好。一旦发布,我可以在安装了Zookeeper服务器的Karaf控制台中使用log:display命令查看它,并且我也可以通过浏览器访问wsdl。问题出在消费端。当服务启动时,它应该写一条消息(下面的ref代码),但是它从来没有发生。消费者代码: 和component.xml:
为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:
我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。 我从redislabs开始学习本教程 守则: 我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正
我们公司在Go中建立了push服务,为了保证传输速度,我们在四台机器上安装了push服务,当我们需要发送通知时,我们将消息发送给rabbitMQ,然后push服务从队列中获取消息,但有时我们发现只有一台机器获取消息。 我应该如何设置配置以确保每个消费者获得相同数量的消息?
我使用的是Camel 2.16.1。关闭后,骆驼的消费者仍然接受新消息。有没有办法迫使消费者立即停止消费。这里也有同样的问题:驼峰关机策略:飞行中的信息不会减少 我为这个问题创建了一个测试用例: 运行测试用例时,我们可以看到机上交换的数量在开始优雅关闭后增加:
使用RabbitMQ,有没有一种方法可以将消息从队列“推送”给使用者,而不是让使用者从队列“轮询并拉出”消息? 这也是我目前正在进行的一个项目引起一些争论的原因。一个方面的论点是,让使用者(即windows服务)“轮询”队列直到新消息到达,与将消息从队列自动“推送”到订户/使用者的想法相比,这种想法有些低效,也不太理想。 我似乎只能找到支持消费者从队列中“轮询并拉出”消息的信息(例如,使用wind
我有一个Kafka主题,目前有3个分区。我希望我的消费者从同一个分区读取,但每条消息都应该以循环方式发送给不同的消费者。有可能实现吗?
我正在使用kafkapython来消费来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止,并且在一段时间后重新启动,我希望从最新生成的消息重新启动,即删除消费者停止时生成的所有消息。我怎样才能做到这一点? 谢谢
我正在尝试Kafka跨国制作人在Java。 就像 它没有抛出任何错误。并且也在Kafka中推送消息,它是可用的。 我可以看到经纪人的日志是这样的: 5分钟后,我找到了这个经纪人日志。[2017-10-30 19:36:44123]信息[Broker 1001上的组元数据管理器]:在0毫秒内删除了0个过期的偏移量。(kafka.coordinator.group.GroupMetadataManag
我使用一个生产者在本地主机上运行的Kafka服务器中输入了一些消息。Zookeeper也在本地主机上。我使用了这里给出的。 但是,消费者似乎没有收到任何消息! 脚本可以提取所有这些消息,但代码不能。怎么了?消费者代码与该页面上给出的代码完全相同。 这是我发布消息的同一主题。以下是制作人的代码:
我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。
我有一个Kafka主题,我正在通过Kafka生产者发送数据。现在在消费者方面,我有两个选择。 null
我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?