出于特殊原因,我需要同时使用 - ConsumerGroup
(又名高级消费者)和 SimpleConsumer
(又名低级消费者)来读取 Kafka。对于 ConsumerGroup
,我使用基于 ZooKeeper 的配置,并且对此完全满意,但 SimpleConsumer
需要实例化种子代理。
我不想同时保留动物园管理员和经纪人主机的列表。因此,我正在寻找一种方法,从ZooKeeper中自动发现特定主题的经纪人。
由于一些间接的信息,我相信这些数据存储在ZooKeeper中的以下路径之一:
然而,当我试图从这些节点读取数据时,我得到了序列化错误(为此我使用了< code>com.101tec.zkclient):
组织。I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ...64 省略 原因:java.io.StreamCorruptedException:无效的流标头:7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream。(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ...还有 69 个
我可以毫无问题地编写和读取自定义Java对象(例如字符串),所以我相信这不是客户端的问题,而是编码的问题。因此,我想知道:
要使用shell执行此操作:
zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
=> [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0
原来Kafka是用< code>ZKStringSerializer来读写数据到znodes的。因此,要修复这个错误,我只需将它作为最后一个参数添加到< code>ZkClient构造函数中:
val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)
使用它,我编写了几个有用的函数来发现经纪人ID、他们的地址和其他东西:
import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException
def listBrokers(): List[Int] = {
zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}
def listTopics(): List[String] = {
zkClient.getChildren("/brokers/topics").toList
}
def listPartitions(topic: String): List[Int] = {
val path = "/brokers/topics/" + topic + "/partitions"
if (zkClient.exists(path)) {
zkClient.getChildren(path).toList.map(_.toInt)
} else {
throw new KafkaException(s"Topic ${topic} doesn't exist")
}
}
def getBrokerAddress(brokerId: Int): (String, Int) = {
val path = s"/brokers/ids/${brokerId}"
if (zkClient.exists(path)) {
val brokerInfo = readZkData(path)
(brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
} else {
throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
}
}
def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
if (zkClient.exists(path)) {
val leaderStr = zkClient.readData[String](path)
val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
getBrokerAddress(leaderId)
} else {
throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
}
}
这就是我的一位同事获取Kafka经纪人列表的方法。我认为当你想动态获取经纪人列表时,这是一个正确的方法。
下面是演示如何获取列表的示例代码。
public class KafkaBrokerInfoFetcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
}
在由三个代理组成的集群上运行代码会导致
1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
我们最近对我们的Kafka集群进行了Kerbertic化,我们开始在阅读来自代理上的主题的消息时遇到问题。 我们使用的是spring kafka 1.1.2版本和kafka client 0.10.0.1。 在研究了Apache Kafka文档中的建议之后,我在项目中做了以下更改。 在使用者属性中添加了security.protocol SASL_PLAINTEXT。 添加了适当的JAAS文件,并
我已经在ec2上的一台机器上设置了一个kafka zookeeper和3个代理,端口为9092..9094,并且正在尝试使用另一台机器上的主题内容。端口2181(zk)、9092、9093和9094(服务器)对使用者计算机开放。我甚至可以做一个 主题:远程访问分区计数:1 ReplicationFactor:3 Configs:主题:远程访问分区:0 Leader:2 Replicas:2,0,1
我正在尝试设置Kafka connect sink连接器。Kafka connect是Kafka connect worker(confluent-3 . 2 . 0)的一部分。我有一个Kafka broker (confluent-3.2.0)在机器a上运行。我想在另一台机器B上设置Kafka-connect-sink连接器来使用消息,使用一个定制的Kafka-connect-sink连接器ja
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
我需要创建一个消费者,能够从多个主题拉和订单消息相对于时间戳(Kafka消息时间戳) 在本例中,我订阅了“主题A”和“主题B”,并按照时间戳的顺序对消息进行排队 现在,只要所有主题只有一个分区,这很容易用这个伪代码来解决: 当我为每个主题引入多个分区时,问题就出现了。显然,不可能将多个主题按时间顺序排序到一个流中,因为在一个主题中,顺序不能保证,只能在一个分区中,所以新的问题是将多个主题排序到具有
如果我有一个主题,它有5个分区,然后我有一个服务消耗这5个分区。然后在consumer,我轮询并返回一个ConsumerRecords数组。 每个单独的ConsumerRecord是否可以来自这5个分区中的任何一个?