我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量:
ListenableFuture<SendResult<String, Message>> future = messageTemplate.sendDefault(id, message);
SendResult<String, Message> sendResult = future.get();
String offset = sendResult.getRecordMetadata().offset();
我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka?
我还在producer中使用了MessageTemplate.flush();
我发现,当Kafka broker的topic leader下降时,Kafka将重新平衡自己,另一个broker将成为该分区的leader,如果ack
配置未设置为all
,则在此过程中有可能丢失一些数据。因此将配置更改为
ack=all
此外,如果同步副本中的最小值小于2,则有可能丢失数据,因此将其至少设置为2。
min.insync.replicas = 2
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢
假设kafka消息生产者向一个主题发送一条事件消息。然后一个消费者处理这个事件消息。但是,这个消费者进程因为业务错误而抛出异常,所以他想让消息生产者知道它并再次怨恨。 有什么解决办法吗?
我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”
我试图使用谷歌云计算引擎VM实例作为Kafka消费者。我发现虚拟机阻止了来自任何外部计算机的通信,我成功地设置了防火墙规则,从本地计算机访问虚拟机。 我能够在云虚拟机实例上创建和列出主题。但我无法收发Kafka主题的信息。它抛出超时异常。 我使用telnet检查端口是否打开,并获得了端口的转义序列(9092)。 当我尝试使用另一个云虚拟机实例实现相同的事情时,我能够执行所有kafka操作。(发送/