我试图用不同的密钥将消息存储到不同的分区。
例如:
ProducerRecord<String, String> rec1 = new ProducerRecord<String, String>("topic", "key1", line);
ProducerRecord<String, String> rec2 = new ProducerRecord<String, String>("topic", "key2", line);
producer.send(rec1);
producer.send(rec2);
但是当我尝试运行我的Producer类时,它总是存储在单个分区中。
根据文档,DefaultPartitioner
使用消息键哈希代码
查找分区。我还看到这个问题Kafka分区键工作不正常, 但我在Kafka Client库的0.9.x版本中找不到<code>ByteArrayPartitioner</code>类。
props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
更新:我正在使用代码动态创建主题。
如果我手动创建一个带有分区的主题,那么它可以正常工作。
如果主题是“动态”创建的,则根据< code>num.partitions参数(默认值为< code>1)创建分区数。而且如果你只有一个分区,所有的数据都会到这个分区。
但是,请记住,即使您有多个分区,一个分区仍然可以分配不同的键!即使您有num-partitions = = num-distinct-keys,也可能存在哈希冲突,将两个不同的键分配给相同的分区(并使一些分区为空)。
如果您想确保不同的键总是指向不同的分区,则需要使用使用者分区器或直接指定分区号。
我试图在我的一个项目中使用Google Maps API,并在指导中设置API凭据,它希望我将应用程序的SHA-1签名证书指纹添加到API中。 在说明中,它说使用下面的命令来获取指纹: 当我尝试使用它时,无论是在Android Studio终端还是在我的应用程序目录中的cmd提示符,我都得到以下错误: 注意,我的应用程序在D:\Projects\Android\AppName中。 我如何获得我的S
我们目前正在实现一个过程(使用Kafka处理器API),我们需要将来自一个主题的两个相关事件(消息)的信息合并,然后转发这些合并的信息。事件源于物联网设备,由于我们希望保持其有序,因此源主题使用设备标识符作为键。事件还包含相关ID: 钥匙 留言 我们的第一种方法是创建一个具有连接状态存储的处理器,该存储存储每条传入的消息,使用相关ID作为键。这使我们能够查询存储以获取传入消息的相关ID,如果存储中
我有一个Azure CosmosDB SQP API帐户,其中包含一个容器“EmployeeContainer”,分区键为“PersonID”。我在这个容器中有三种不同类型的集合。它们的模式如下所示: 部门-雇员 如何将数据存储在逻辑分区中?PersonId是分区键,所有集合中都有PersonId。那么,人员id为“p1241234”的人员集合中的文档和人员id为“p1241234”的人员-部门集
我有1个消费者群体和5个消费者。也有5个分区,因此每个消费者得到1个分区。 CLI还显示 bin/Kafka-console-consumer . sh-bootstrap-server localhost:9092-Topic Topic-1-from-beginning-partition { n }正确显示每个分区的不同消息。 然而,我经常看到两个或两个以上的消费者在处理同一条信息,而且对于
文件。当前密钥、web API密钥和服务器密钥之间有什么区别,在哪里可以找到当前密钥FCM控制台?我找不到它。
我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。