我在用Kafka。
我有10k个jsons列表,
for(int i=0 ;i< jsonList.size(); i++){
ProducerRecord<K,V> record = new ProducerRecord(topic, jsonList[i]);
producer.send(record);
}
ProducerRecord<K,V> record = new ProducerRecord(topic, jsonList);
producer.send(record);
我该怎么做呢?
谢谢
正式使用KafkaProducer
和ProducerRecord
不能这样做,但可以在ProducerConfig
中配置一些属性
document producer将记录批处理成发送到同一分区的请求,并立即发送它们
每当多个记录被发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求。这有助于客户机和服务器的性能。此配置以字节为单位控制默认批处理大小。不会尝试批处理大于此大小的记录。
生产者将在请求传输之间到达的任何记录组合到一个单独的批处理请求中。此设置通过添加少量的人为延迟来实现这一点--也就是说,生产者将等待到给定的延迟,以允许发送其他记录,从而可以将发送的记录批处理在一起,而不是立即发送一条记录。这个设置给出了批处理延迟的上限:一旦我们获得分区的Batch.Size worth记录,它将立即发送,而不管这个设置如何。
我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”
我试图使用谷歌云计算引擎VM实例作为Kafka消费者。我发现虚拟机阻止了来自任何外部计算机的通信,我成功地设置了防火墙规则,从本地计算机访问虚拟机。 我能够在云虚拟机实例上创建和列出主题。但我无法收发Kafka主题的信息。它抛出超时异常。 我使用telnet检查端口是否打开,并获得了端口的转义序列(9092)。 当我尝试使用另一个云虚拟机实例实现相同的事情时,我能够执行所有kafka操作。(发送/
我有一个网页,需要发送Kafka信息到一个主题。网络正在使用vuejs。我尝试使用npm“Kafka节点”和“Kafka”,它们在建立Kafka连接时都有错误。也许它们都是服务器端npm? 是否有任何js软件包支持网页扮演Kafka制作人的角色。我不想设置其他中间服务器(比如kafka http proxy)。我希望网页直接发送信息到主题。可行吗
我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。