感谢你在这个问题上的帮助。
我使用的是Kafka 0.8.2
这是我写的制作人代码。
public class cvsKafkaProducerAck {
public static void main(String args[]) {
String topic = "test_topic";
String msg = "test...22";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.ACKS_CONFIG, "all");
//props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG , "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(props);
ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(topic, msg.getBytes());
try
{
RecordMetadata metadata = m_kafkaProducer.send(prMessage).get();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
m_kafkaProducer.close();
}
我们有一个使用node-zookeeper-client库的Kafka代理监控系统。
代理在Zookeeper节点/Brokers/ids
上注册。使用client.getChildren()函数,我们不断地监视该路径,查找正在添加或删除的代理。client.getChildren()
函数可以在Zookeeper节点上放置一个观察者,该观察者将在添加或删除子节点时激发一个事件。
我们的节点应用程序在启动时从/brokers/ids
路径获取已知的代理,然后连续监视路径的更改,在添加或删除代理时发出警报。这是通过使用lodash库将当前代理的数组与上次检索到的代理列表的数组进行比较来完成的,例如。
//this code is fired when the /brokers/ids/ path changes (i.e. a child node is added or removed)
//currentBrokers = list of brokers retrieved when the app started
//newBrokerList = latest list of brokers retrieved from client.getChildren()
var brokersChanged = _.difference(currentBrokers, newBrokerList);
//if 'brokersChanged' contains any values alert that
//these brokers have become unregistered with zookeeper
if(brokersChanged.length > 0){
_.each(brokersChanged, function(broker){
var indexToRemove = _.findIndex(currentBrokers, function(existingBroker) {
if(existingBroker == broker){
return existingBroker;
}
});
currentBrokers.splice(indexToRemove,1);
});
}
else {
//if 'brokersChanged' is empty a new broker has been registered
//with zookeeper
var newBroker = _.difference(data, currentBrokers);
_.each(newBroker, function(broker){
currentBrokers.push(broker);
});
}
//currentBrokers will now contain the updated list of brokers
创建了一个群集,其中有两个代理使用相同的动物园管理员,并试图为主题生成消息,其详细信息如下。 当生产者设置或-1时,,它应该接收代理(领导者和副本)的确认,但当一个代理在制作时手动关闭时,即使在acks=“all”有人能解释这种奇怪行为的原因时,对Kafka制作人也没有任何影响? 经纪人在9091,9092。 下面是Kafka制作人的源代码
我正在尝试仅为代理间kerberos配置Kafka代理。然而,由于它似乎也想通过Kerberos连接到Zookeeper,所以我似乎总是遇到错误。我目前还没有设置任何Zookeeper键。 我的Kafka代理 JAAS 配置如下: 服务器属性 我用上述配置得到的错误如下: 换句话说,我只想要经纪人到经纪人的 kerberos 和经纪人 - 动物园管理员的普通SASL_SSL。这可能吗?
我知道生产者/消费者需要与经纪人交谈以了解分区的领导者。经纪人与zk交谈以告诉他们加入了集群。 是真的吗 经纪人从zk知道谁是给定分区的负责人 zk发现经纪人离开/死亡。然后重新选举领导人,并向所有经纪人发送新的领导人信息 问题: 为什么我们需要经纪人相互沟通?这只是为了让tehy可以移动分区,或者他们也可以互相查询元数据。如果是这样,元数据交换的例子是什么
我有两个< code>kafka 0.10.1的代理集群,之前在我的开发服务器上正确运行< code>zookeeper 3.3.6。 我最近尝试将broker版本升级到最新的,但没有开始。配置没有太大变化 谁能告诉我可能会出什么问题吗。为什么经纪人没有起步? 已更改服务器。代理服务器1上的属性 已更改代理服务器2上的server.properties 注意: 1.Zookeeper正在两台服务器
我在Windows子系统Linux上安装了kafka,并开始使用命令服务启动,所有服务都已启动。现在,当我尝试从Windows运行我的kafka-spring应用程序时,它显示以下错误:- 无法建立与节点-1(localhost/127.0.0.1:9092)的连接。经纪人可能不可用。 我的服务器属性是:- 我哪里出错了???
在Flink中,我执行以下代码: 我推出3次同样的工作。 如果我用一个代理执行这段代码,它工作得很好,但是用3个broke(在3个不同的机器上)只读取一个分区。 null