KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message));
producer.send(keyedMessage);
当前,我正在发送不带任何密钥的邮件作为密钥邮件的一部分,它是否仍能与delete.retention.ms
一起工作?我需要发送密钥作为消息的一部分吗?把key作为信息的一部分好吗?
如果您需要一个键的强顺序,并且正在开发状态机之类的东西,那么键通常是有用的/必要的。如果您要求具有相同密钥(例如,唯一id)的消息始终以正确的顺序显示,那么将密钥附加到消息将确保具有相同密钥的消息始终进入主题中的相同分区。Kafka保证分区内的秩序,但不保证主题中的分区之间的秩序,因此不提供键--这将导致分区之间的循环分布--也不会维持这样的秩序。
在状态机的情况下,键可以与log.cleaner.enable一起使用,以便对具有相同键的条目进行重复数据删除。在这种情况下,Kafka假设您的应用程序只关心给定键的最新实例,并且只有当该键不为空时,日志清理器才删除给定键的旧副本。这种形式的日志压缩由log.Cleaner.Delete.Retention属性控制,并且需要键。
或者,更常见的属性log.retention.hours(默认情况下启用)通过删除日志中过期的完整段来工作。在这种情况下,不需要提供密钥。Kafka将简单地删除日志中比给定的保留期更早的块。
这就是说,如果您启用了日志压缩,或者对具有相同键的消息要求严格的顺序,那么您肯定应该使用键。否则,空键可以提供更好的分布,并在某些键可能比其他键出现更多的情况下防止潜在的热点问题。
如何保持一个连续的流,以“反应”新的丢弃的文件?(或其他事件,如HTTP GET请求或类似的事件)... 例如,如果我不返回PublisherBuilder的实例,而是返回一个整数,那么我的kafka主题将由一个非常巨大的整数值流填充。这就是为什么示例在发送消息时使用一些间隔... 我应该使用一些CompletationStage或CompletableFuture吗?RXJava2?使用哪个li
我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢
我每个人。 我问你是因为我在Netty的解码器中的ByteBuf有问题。 我想解码我的消息谁到达一个服务器,但ByteBuf不工作,因为我会。 问题是ByteBuf没有获取消息的所有字节。 我解释说,我有一条长度为1221字节的消息(这是一个示例),但缓冲区大小只有64字节。 当我试图阅读,缓冲区与我的长度和我有这样的错误: 我认为Netty没有时间阅读所有内容并只发送部分消息,但我现在不知道是否
我有一个网页,需要发送Kafka信息到一个主题。网络正在使用vuejs。我尝试使用npm“Kafka节点”和“Kafka”,它们在建立Kafka连接时都有错误。也许它们都是服务器端npm? 是否有任何js软件包支持网页扮演Kafka制作人的角色。我不想设置其他中间服务器(比如kafka http proxy)。我希望网页直接发送信息到主题。可行吗
Avro对单个Kafka主题的信息进行编码,单个分区。这些消息中的每一条都只能由特定的消费者使用。对于ex,关于这个主题的消息a1、a2、b1和c1,有3个消费者,分别名为A、B和C,每个消费者将获得所有消息,但最终A将使用a1和a2、b1上的B和c1上的C。 我想知道当在Kafka上使用avro时,这是如何典型地解决的: 让使用者反序列化消息,然后由某个应用程序逻辑决定使用消息还是删除消息 使用