我正在努力解决如何正确使用分区键机制的问题。我的逻辑是将分区号设置为3,然后创建三个分区键作为“0”,“1”,“2”,然后使用分区键创建三个键控消息,例如
之后,创建一个生产者实例来发送所有KeyedMessage。
我希望每个KeyedMessage应该根据不同的分区键进入不同的分区,这意味着
我正在使用Kafka web控制台查看主题状态,但结果与我预期的不同。KeyedMessage仍然随机进入分区,有时两个KeyedMessages会进入同一个分区,即使它们有不同的分区键。
为了让我的问题更清楚,我想发布一些我目前拥有的Scala代码,我正在使用Kafka 0.8.2-beta和Scala 2.10.4。
这是生产者代码,我没有使用自定义partitioner.class:
val props = new Properties()
val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(synchronously) "sync" else "async")
props.put("metadata.broker.list", brokerList)
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", messageSendMaxRetries.toString)
props.put("request.required.acks",requestRequiredAcks.toString)
props.put("client.id",clientId.toString)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
def html" target="_blank">kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
new KeyedMessage(topic,message)
} else {
new KeyedMessage(topic,partition,message)
}
}
def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
def send(message: Array[Byte], partition: Array[Byte]): Unit = {
try {
producer.send(kafkaMesssage(message, partition))
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
}
以下是我如何使用生产者,创建一个生产者实例,然后使用此实例发送三条消息。目前,我将分区键创建为整数,然后将其转换为字节数组:
val testMessage = UUID.randomUUID().toString
val testTopic = "sample1"
val groupId_1 = "testGroup"
print("starting sample broker testing")
val producer = new KafkaProducer(testTopic, "localhost:9092")
val numList = List(0,1,2);
for (a <- numList) {
// Create a partition key as Byte Array
var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
//Here I give a Array[Byte] key
//so the second "send" function of producer will be called
producer.send(testMessage.getBytes("UTF8"), key)
}
不确定我的逻辑是否正确,或者我没有正确理解分区键机制。任何人都可以提供一些示例代码或解释,那就太好了!
有同样的问题-只需切换到ByteArrayPariator:
props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
默认的分区器查看该键(作为一个字节数组),并使用(% numPartitions)将该值转换为介于0和分区数-1之间的整数值。得到的整数决定了消息被写入的分区,而不是你所做的键值。
人们通常认为分区是一种在业务类别上分离业务数据的方法,但这不是查看分区的正确角度。
分区直接影响这些主题:
-性能(每个分区都可以与其他分区并行使用)
-消息顺序(仅在分区级别保证的消息顺序)
我将举一个如何创建分区的例子:
你有一个话题,说我的消息世界
您想将此主题(所有MyMessagesToWorld)转移给某些消费者。
你称了我的消息世界的整个“质量”,发现这是10公斤。
您在“MyMessagesToWorld”中有以下“业务”类别:
-给爸爸的信息(D)
-给妈妈的信息(M)
-给sis的消息
-给奶奶的启示 (G)
-给老师的启示(T)
-给女朋友的信息(F)
你想,谁是你的消费者,然后发现你的消费者是侏儒,每个人能在一个小时内消耗1公斤的信息。
你最多可以雇佣2个这样的侏儒。
1个gnome需要10小时才能消耗10 kg消息,2个gnome需要5小时。
所以你决定雇佣所有可用的侏儒来节省时间。
为了给这两个侏儒创建两个“频道”,你在Kafka上创建了这个主题的两个分区。如果你看到更多的侏儒,创建更多的分区。
您内部有6个业务类别和2个连续的独立消费者 - 侏儒(消费者线程)。
怎么办呢?
Kafka的方法如下:
假设您在集群中有2个kafka实例。(相同的示例OK,如果您在集群中有更多实例)
您在Kafka上将分区号设置为2,例如(以Kafka 0.8.2.1为例):
您在Kafka中定义您的主题,告诉您该主题有2个分区:
kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld
现在主题MyMessagesToWorld有2个分区:P(0)和P(1)。
你选择了第二个(分区),因为你知道,你只有两个消耗侏儒。
您可以稍后html" target="_blank">添加更多的分区,届时将使用更多的消费者gnome。
不要将Kafka消费者与此类侏儒混淆。
Kafka消费者可以雇用N个侏儒。(N 个并行线程)
现在,您为您的消息创建密钥。
您需要密钥来在分区之间分发您的消息。
键将是这些字母的"业务类别",你之前定义:D,M,S,G,T,F,你认为这样的字母是可以作为ID。
但在一般情况下,任何可能用作键:(复杂对象和字节数组,任何东西...)
如果创建 NO 分区程序,则将使用默认分区程序。
默认分区程序有点愚蠢。
它获取每个KEY的hashcode并将其除以可用分区的数量,“提醒”将删除该键的分区数量。
例子:
KEY M, hashcode=12345, partition for M = 12345 % 2 = 1
正如您可以想象的那样,在最好的情况下,使用这样的分区器,每个分区中有3个业务类别。
在更糟糕的情况下,您可以将所有业务类别都放在1个分区中。
如果您有 100000 个业务类别,则通过此类算法在统计上可以分发它们。
但是只有很少的类别,你可以有不是很公平的分配。
因此,你可以重写partitioner,更明智地分配你的业务类别。
有一个例子:
此分区器在可用分区之间平均分配业务类别。
public class CustomPartitioner {
private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>();
private static AtomicInteger sequence = new AtomicInteger();
private ReentrantLock lock = new ReentrantLock();
public int partition(ProducerRecord<String, Object> record, Cluster cluster) {
String key = record.key();
int seq = figureSeq(key);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
int part = seq % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
// no partitions are available, give a non-available partition
return seq % numPartitions;
}
}
private int figureSeq(String key) {
int sequentualNumber = 0;
if(keyDistributionTable.containsKey(key)){
sequentualNumber = keyDistributionTable.get(key);
}else{//synchronized region
//used only for new Keys, so high waiting time for monitor expected only on start
lock.lock();
try{
if(keyDistributionTable.containsKey(key)){
sequentualNumber = keyDistributionTable.get(key);
}else{
int seq = sequence.incrementAndGet();
keyDistributionTable.put(key, seq);
sequentualNumber = seq;
}
}finally{
lock.unlock();
}
}
return sequentualNumber;
}
}
我想知道,在什么情况下,具有相同分区键的消息会进入不同的分区。 我使用下面给出的命令运行了属于同一组的两个消费者在控制台中监听一个主题: 我使用“纳米/Kafka-php”库将消息放入带有键 的主题。当我发送多个这样的消息时,我发现很少有消息转到第二个消费者,而大多数消息都发送给消费者1。 由于我对所有消息使用相同的密钥,因此我希望所有消息都由同一个使用者使用。每个使用者都绑定到每个分区。 我使用
来自jvisualvm的快照
为什么Eclipse在构建一个Android项目时,会陷入构建工作区的无限循环...和(重新)构建工作区...和(重新)构建工作区... 这是已知的虫子吗? 走出这个循环的正确方法是什么? 备注: 如果取消选中,它会生成良好的版本,甚至导出一个完美运行的签名发行版APK。 此问题仅在我升级到最新的SDK r19后才开始。在此之前(SDK r11),我从未遇到过此问题。 错误160868正好描述了这
当我们基于某个键在流上应用组 by 函数时,kafka 如何计算这一点,因为相同的键可能存在于不同的分区中?我看到了()函数,它基本上对数据进行了重新分区,但我不明白它是什么意思。它会将具有相同键的所有消息移动到单个分区中吗?另外,我们可以通过()方法调用的频率如何?如果有要求,我们可以在收到每条消息后调用它吗?请建议。谢谢
我们希望使用logstash获取日志并将其传递给Kafka。 我们已经为logstash1.5.0beta1和kafka 2.9.2_0.8.1.1编写了以下conf文件 ** ** 运行以下命令后:bin/logstash代理-ftest.conf--logex.log test.conf是我们的conf文件。ex.log是我们为要存储的日志创建的空白文件。 我们得到以下输出 发送logstas
那是我学习Kafka的初期。我正在检查我本地机器中的每一个Kafka属性/概念。 所以我遇到了属性,下面是我的理解。如果我误解了什么,请纠正我。 将消息发送到主题后,必须将消息写入至少关注者数。 还包括引导。 如果可用活动代理的数量(间接地,在同步副本中)少于指定的,则生产者将引发发布消息失败的异常。 以下是我创建上述场景所遵循的步骤 在本地启动了3个代理,代理ID为0、1和2 创建了主题insy