我是Kafka的新手,与新的KafkaProducer和KafkaConsumer合作,版本:0.9.0.1
在java中是否有任何方法可以在创建特定主题后更改/更新该主题的分区数。
我不是用zookeeper来制造话题。当发布请求到达时,我的KafkaProducer会自动创建主题。
是的,有可能。您必须访问kafka2.11-0.9.0.1.jar
中的Adminutils
scala类才能添加分区。
adminutils
支持主题中的分区数只能增加。类路径中可能需要kafka2.11-0.9.0.1.jar
、zk-client-0.8.jar
、scala-library-2.11.8.jar
和scala-parser-combinators2.11-1.0.4.jar
jar。
下面的部分代码借鉴了kafka-cloudera示例。
package org.apache.kafka.examples;
import java.io.Closeable;
import org.I0Itec.zkclient.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode.Enforced$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class Test {
static final Logger logger = LogManager.getLogger();
public Test() {
// TODO Auto-generated constructor stub
}
public static void addPartitions(String zkServers, String topic, int partitions) {
try (AutoZkClient zkClient = new AutoZkClient(zkServers)) {
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
if (AdminUtils.topicExists(zkUtils, topic)) {
logger.info("Altering topic {}", topic);
try {
AdminUtils.addPartitions(zkUtils, topic, partitions, "", true, Enforced$.MODULE$);
logger.info("Topic {} altered with partitions : {}", topic, partitions);
} catch (AdminOperationException aoe) {
logger.info("Error while altering partitions for topic : {}", topic, aoe);
}
} else {
logger.info("Topic {} doesn't exists", topic);
}
}
}
// Just exists for Closeable convenience
private static final class AutoZkClient extends ZkClient implements Closeable {
static int sessionTimeout = 30_000;
static int connectionTimeout = 6_000;
AutoZkClient(String zkServers) {
super(zkServers, sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$);
}
}
public static void main(String[] args) {
addPartitions("localhost:2181", "hello", 20);
}
}
问题内容: 我是Kafka的新手,正在使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1 在创建特定主题之后,java中是否有任何方法可以更改/更新特定主题的分区数。 我没有使用Zookeeper创建主题。当发布请求到达时,我的KafkaProducer会自动创建主题。 如果还不够,我还可以提供更多详细信息 问题答案: 是的,有可能。您必须在中访问scala类以添
Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。
简单问题: 假设我有一个具有3个分区的主题:Topic:StateEvents P1、P2和P3。 让我们假设生产者生成20条消息: 1, 2, 3, ..........20 我的问题是: 当制作人生成这些消息时: 1)每个消息将只在且仅在1个分区?也就是说,1在P1,2在P2,3在P3,然后4在P1,5在P2,6在P3,以此类推? 2)如果#1为真,当消费者订阅时,它将订阅所有分区,以便获得所
我创建了一个包含3个分区的主题 我使用Java制作人同步写入主题 我有一个Java的用户订阅并阅读它 我的键总是一组固定的3个不同的字符串(k1、k2、k3)。但是我的消息总是去分区1或分区2——k1和k2去分区1,k3去分区2。 为什么分区0未使用?
通过Kafka文档和各种其他资源,我了解到Kafka中的消息被组织成主题。此外,主题可以分解为多个分区,每个分区可以托管在不同的服务器上。这提供了冗余和可伸缩性。 我不确定这里的“破碎”这个词是什么意思。这是否意味着,如果添加到主题的消息是,例如“1 2 3 4 5 6 7”,那么在将其分解为分区后,我们将有一个分区仅包含整个主题的子部分。就像一个分区有“1 2 3”,而另一个分区有“4 5 6”
我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p