我是Kafka新手,我正在使用Kafka1.0。
我使用拉取模式读取kafka消息,也就是说,我定期投票()
查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。
我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)
每个使用者组维护其每个主题分区的偏移量。由于每个消费者组的提交偏移量信息存储在一个名为(默认情况下)消费者偏移量的内部主题中(在v0.9之前,此信息存储在Zookeeper上)。当offset manager收到OffsetCommitRequest时,它会将请求附加到一个名为“code>\uu consumer\u offset”的特殊压缩Kafka主题。最后,只有当偏移主题的所有副本都接收到偏移时,偏移管理器才会向使用者发送成功的偏移提交响应。
我正在实现一个生成Kafka消息的过程,每个消息都应该有由模式注册表验证的模式。为了开发,我使用docker运行kafka和模式注册表,我的模式由模式注册表ui进行注册。 看起来我的模式没有被验证或者我缺少一些配置。我的生产者类有以下代码: 大多数时候,我会收到一个错误,如“Schema not found”,当没有引发此异常时,我的消息不会被验证,它只是将消息发送到另一个主题。 是否缺少任何类型
我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题
我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用 下面是我的application.yml文件
我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?
我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。
本文向大家介绍Kafka 消息是采用 Pull 模式,还是 Push 模式?相关面试题,主要包含被问及Kafka 消息是采用 Pull 模式,还是 Push 模式?时的应答技巧和注意事项,需要的朋友参考一下 Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息还是 brokers 将消息推送到consumer,也就是 pull 还 push。在这方面,Kafka 遵循了一