这与以下问题几乎相同:发送给具有相同消费者组名称的所有消费者的消息。公认的答案是使用Kafka 0.8.1或更高版本,我就是这么做的。 Kafka留档说: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 但是我无法使用 Kafka 0.8.2.1 和 kafkacat 观察到这种行为。 我的设置: Kafka Zookeeper 运行在 spotify
我在使用者组中轮询来自 Kafka 的消息时遇到问题。我的使用者对象分配给给定的分区 之后,消费者向该分区分配: 之后,我可以计算分区内的消息 和 ..... 在我的主题中有超过30000条消息。问题是我只收到一条消息。 具有< code > max _ poll _ records = 200 < code > AUTO _ OFFSET _ RESET 的消费者配置是最早的 这是我的函数,我正
我正在使用< code > librdkafka C API consumer(特别是使用< code > rd _ Kafka _ consumer _ poll 来读取,在此之前我确实调用了< code > rd _ Kafka _ poll _ set _ consumer ) 我看到的问题是,在我的谷歌测试中,我做了以下操作 > 给Kafka写3条信息 初始化/启动kafka消费者(< c
在消费者行中的消息中出现以下错误 法典: 消费者= KafkaConsumer(topic,group_id=groupid,bootstrap_servers=[host]),用于消费者中的消息: 错误:对于使用者中的消息:文件“\Python\Python38-32\lib\site packages\kafka\cconsumer\group.py”,第1192行,在next return
我设置了一个Spring集成流程来处理一个有3个分区的主题,并将侦听器容器的并发性设置为3。正如所料,我看到三个线程处理来自所有3个分区的批处理。然而,我发现在某些情况下,一个侦听器线程可能处理包含来自多个分区的消息的单个批处理。在kafka中,我的数据是按id划分的,因此它可以与其他id同时处理,但不能在另一个线程上与相同的id一起处理(我很惊讶地发现这种情况正在发生)。通过阅读文档,我认为每个
你好,我正在使用Spring云流编写一个Kafka消费者生产者。在我的消费者内部,我将数据保存到数据库中,如果数据库出现故障,我将手动退出应用程序。重新启动应用程序后,如果数据库仍然关闭,则应用程序将再次停止。现在,如果我第三次重新启动应用程序,中间间隔(两次失败)收到的消息丢失,kafka 消费者会获取最新消息,也会跳过我退出代码的消息。 入站和出站通道绑定器接口 服务等级- 1)生产者服务 2
我们的生产遭遇一个Kafka事件的消费现象。总事件量为34亿个事件,有40个分区。且事件消息几乎均匀地分布在每个分区上,每个分区有8000万个事件。 我们分配了 40 个消费者流和 40 个线程(顺便说一句,我们使用 kafka 客户端 0.8.2)。 在消耗期间,在前4小时,每个分区的延迟不断下降。在最后一个小时,2/3的消费者流已经完成事件消耗。只有不到10个消费者流继续接收剩余事件。对于相关
我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。 父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。 我认为将所有输入流设为 KTable,它们
我有一个处理器,它从主题中获取json字符串,类型为GenericRecord。现在我把这条河分成两条支流。我采用第一个分支,并将(key,value)映射为2个字符串,其中包含一个特定的json字段和该字段的值,然后按key分组。到目前为止,一切都很好。现在,我必须用用户定义的新类型聚合流,并收到一个异常。 这里是代码: 新类型: 好流: 问题是: 这是例外: 我如何解决这个问题? 更新 ---
我正在尝试使用 Kafka 流对 CDC 数据执行 KTable-KTable 外键连接。我将读取的数据是 Avro 格式,但它的序列化方式与其他行业序列化程序/反序列化程序(例如 Confluent 模式注册表)不兼容,因为模式标识符存储在标头中。 当我设置KTables的Serdes时,我的Kafka Streams应用程序最初运行,但最终失败,因为它在内部使用,而不是包装序列化程序Value
我对kafka制作人有问题。实际上我正在使用Spring kafka,并通过KafkaTemboard leke发送消息: 问题是有时发送消息需要 4-20 秒。有很多消息需要 100 毫秒才能发送。所以我有几个问题: > < li> 消息大小和吞吐量之间是否有关联,这种关系是什么? 我应该首先检查什么,也许我没有很好地调整,任何方向?
我正在使用Spring Boot中的。Java 8 我的主要目的是,消费者不应重复使用信息。 1)调用表获取100行并将其发送到kafka 2) 假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在RETRY机制计时内无法恢复) 因此,当我重新启动Spring启动应用程序时,我如何确保不再发送这70条消息。 一种选择是我可以在数据库表消息 中使用标志。 还有其他有效的方法吗?
我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?
我是Kafka的新用户,我们在应用程序中使用了Spring Web Flux。我们需要向两个不同的主题推送两个不同的消息,比如T1和T2。Kafka经纪人也是一样。我们正在使用ReactiveKafkaProducerTemplate,效果很好。 现在我们只需要单独压缩一个主题[T1]内容,因为消息大小在主题T1上更大。我们是否在响应式Kafka或Project Actor中支持路由KafkaTe
对于Java/Kotlin Spring启动应用程序,如果我想向Kafka发送消息或使用来自Kafka的消息。您建议使用Spring Kafka库还是仅使用Kafka Java API。 不太确定Spring是否提供了更多的好处,或者只是一个包装器?对于Spring,他们提供了很多注释,当遇到一些运行时错误时,这些注释看起来更神奇。 想听一些意见。