我正在使用kafka 0.10 kafkaConsumer API获取消费者和消费者订阅下的主题集。 现在我可以通过topiclist方法成功地获取主题和分区。 要获取消费者组数据,我在KafkaConsumer下找不到方法,但我可以通过“zookeeper.getChildren(ZkUtils.ConsumerPath(),false);”从zookeeper获取组列表。 我的问题是如何得到群
当消费者实例组出现时,会不会对Kafka的性能产生任何影响。重新启动时id已更改。老年人会发生什么。id它是否仍在代理内存中,或者何时将被删除?假设我有1000个消费者实例,并且所有实例都动态分配组。重新启动时的id。 可以为{log.retention.ms'}提供什么列表值。我可以设置为1毫秒吗?
/usr/local/kafka2.12-2.6.0/config/server.properties 在开始动物园管理员和Kafka之后,创建一个新的主题 检查所有三个节点上的集群状态
我目前正在学习Scala 消费者应能够处理以下任务: 跟踪偏移量 找出哪个代理是主题和分区的主代理。 必须能够处理代理领导变更 我找到了一个非常好的文档,用Java创建这个消费者(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0SimpleConsumer示例)。 有没有人有一个创建这个simpleconsumer的示例Scala代码,
当Kafka消费者未能反序列化消息时,客户端应用程序是否有责任处理Poison Message? 或者 Kafka 是否“递增”了消息偏移并继续使用有效消息? 是否有处理Kafka主题中的有害信息的“最佳实践”?
我正在使用Apache Camel2.13.1轮询一个数据库表,其中将有300k行以上。我希望使用幂等使用者EIP来过滤已经处理过的行。 不过,我想知道这个实现是否真的是可伸缩的。我的骆驼上下文是:- 在1908988是request.body.id的情况下,我已经将EIP设置为键上,所以这并不容易合并到我的查询中。 是否有更好的方法将CAMEL_MESSAGEPROCESSED表用作select
我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认
我们已经设置了MirrorMaker来跨两个Kafka集群复制消息。我们还在镜像制造商消费者属性中设置了来复制内部主题。我假设这也将复制,这将反过来同步辅助集群中的消费者组偏移量。 但是,当我们在二级集群中启动消费者组时,它从一开始就开始使用消息,因此看起来消费者组偏移量在二级群集中没有得到复制。 有人能提供一些建议吗?我们如何使用MirrorMaker或任何其他解决方案在辅助集群中同步消费者组偏
我正在给Kafka写一个msg,然后在另一端消费。在里面做一些过程,并把它写回另一个Kafka主题。 我想知道哪个消息响应是哪个请求... 当前决定捕获来自消费者侧的偏移id然后在响应中写入和读取响应有效载荷并决定相同。 对于这种方法,我们需要阅读每条消息。根据消费者配置条件,我们还有其他方法可以使用吗?
我将 Kafka 提交策略设置为最新且缺少前几条消息。如果我在开始将消息发送到输入主题之前先睡20秒,那么一切都按预期工作。我不确定问题是否与消费者需要很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好了?
我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。
我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi
我有一个由第三方发布的JMS队列。我想在不同的机器上设置多个使用者,只有一台特定机器的使用者确认该队列上的消息。简而言之,如果特定机器的使用者没有接收到消息,那么该消息不应从队列中删除。这是可以实现的吗?
我有一个kafka主题,3个分区,只有一个带批处理的消费者。我在消费者方面使用的是spring kafka和以下消费者道具: 即使队列中有数千条消息(GBs数据)在等待,kafka consumer在每次轮询中也会收到大约10条消息(总大小约为1MB)。使用者应该获取(在我的示例中为15MB)或(在我的示例中为10000)的批处理。有什么问题?
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没