这是关于让生产者知道消息是否已被消费者消费的用例。
我的想法是,每次消费消息时,消费者都会提交偏移量,生产者可以跟踪和读取当前偏移量,以查看是否消费了相应的消息。
此外,请不要犹豫,让我知道这是否是处理用例的正确方法,因为我没有Kafka的经验。我知道Kafka不是为这种方式设计的(处理上述用例),但我必须坚持使用Kafka。
如果问题是标题,答案是否定的,至少在最新版本的kafka中0.9.0.1没有办法知道生产者对分区的偏移量是多少。
假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。 有可能得到哪个是给定生产者生产的最后一个偏移吗? 例如: 生产者: 我想找出分别由P1和P2发布的最后一条记录的偏移量。 请注意,我不是在要求全局主题分区偏移量。
前期回顾 其中channel.finishConnect()中完成建立连接,调用了 sender的run(),继续分析 其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send() 调用selector的send() 调用kafkachannel的setsend() 开始发送数据 sender里面的pol
我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看
如果我只在生产者端发送一条记录并等待,生产者何时将记录发送给经纪人?在Kafka文档中,我找到了名为“linger.ms”的配置,它说: 一旦我们得到 根据以上文件,我有两个问题。 > 如果生产者收到的数据达到batch.size,它会立即触发发送一个只包含一个批次的请求给代理?但是正如我们所知,一个请求可以包含许多批次,那么它是如何发生的呢? 这是否意味着即使是收到的数据也不足以批量处理。大小,
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。