我对阿帕奇·Kafka很陌生,目前正在读《学习阿帕奇·Kafka》,第2版(2015年)。第3章,段落Kafka设计基础说:
使用者总是按顺序使用来自特定分区的消息,并确认消息偏移量。这个确认意味着使用者已经使用了所有先前的消息。使用者向代理发出包含要消费的消息的偏移量的异步拉请求,并获得字节的缓冲区。
我有点被‘承认’这个词搞糊涂了。我是否正确理解为Kafka首先发送偏移量,然后消费者使用偏移量列表来拉动请求它尚未消费的数据?
先谢谢你,
尼克
启动时,kafkaconsumer
向在此使用者上配置的特定使用者组的代理发出偏移量查找请求。如果返回有效的偏移量,则使用这些偏移量。否则,使用者根据auto.offset.reset
参数使用初始偏移量。
之后,偏移量主要保持在消费者的内存中。每个poll()
向代理发送当前偏移量,并且在代理回复时,使用者更新内存中的偏移量。
此外,内存中的偏移量不时被提交/加密到代理。如果启用了自动提交,则这可能在poll()
内自动发生,或者必须显式调用commit()
将偏移量发送到代理以可靠地存储它们。
我想重置AerospikeSink Kafka Connector偏移量,我首先删除连接器消费组()偏移量,然后重新创建它。当我使用策略重新创建时,它以正确的偏移量重新创建,但是然后,当任务状态从更改为任务时,它会从连接器的前一个实例到达的点继续处理,这会阻止从一开始就读取来自kafka的所有消息(我正在尝试再次读取来自kafka的所有消息)。 注意:使用新名称创建新连接器并不能解决问题。 使用任
我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地
我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。
我在JAVA中通过TCP接收字节数据包时遇到了一些问题。我的TCPServer类发送207字节的数据包。当我发送一个数据包时,控制台中的程序显示“读取207字节的数据包”然后停下来。在下一个数据包继续执行时,显示“多重测量”和“读取1868767867字节数据包”。之后,接收将永远停止。我不知道它为什么接收1868767867字节。我在wireshark中检查它,服务器总是发送207字节。 这是我
我有一个Kafka接收器任务,通过方法收听Kafka主题 但我不想自动提交偏移量,因为一旦从Kafka取出记录,我就有一些处理逻辑 从Kafka获取记录后,如果处理成功,则只有我想提交偏移量,否则它应该再次从同一偏移量读取。 我可以在Kafka consumer中看到方法,但在中找不到替代方法。
问题内容: 我正在使用套接字连接我的Android应用程序(客户端)和Java后端服务器。每次与服务器通信时,我都希望从客户端发送两个数据变量。 1)某种消息(由用户通过界面定义) 2)消息的语言(由用户通过界面定义) 我该如何发送这些消息,以便服务器将每个消息解释为一个单独的实体? 在读取了服务器端的数据并做出了适当的结论之后,我想向客户端返回一条消息。(我想我会没事的) 因此,我的两个问题是如