当前位置: 首页 > 知识库问答 >
问题:

kafka启动失败(版本0.8.0 beta1)

陶琦
2023-03-14

我正在尝试在独立模式(在ec2上)上用动物园管理员版本(3.3.6)启动kafka服务。因此,我运行1)sbt update 2)sbt package 3)sbt assembly package依赖项,然后启动zookeeper服务,然后启动kafka服务器。但是,我收到以下错误消息:kafka服务器日志:

ERROR Error while electing or becoming leader on broker 0 (kafka.server.ZookeeperLeaderElector)
java.net.ConnectException: Connection timed out
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:84)
    at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
    at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
    at kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:35)
    at kafka.controller.KafkaController.startChannelManager(KafkaController.scala:503)
    at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:467)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:215)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:89)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
    at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

对于动物园管理员日志:

    2014-07-15 15:49:22,996 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x57 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,102 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x59 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,109 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,215 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest

对于Kafka生产者日志:

[2014-07-15 15:49:23,107] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 23 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,107] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,112] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: edwintest (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,112] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,213] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 24 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,213] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,217] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,218] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,219] ERROR Failed to send requests for topics edwintest with correlation ids in [17,24] (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,219] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
    at scala.collection.immutable.Stream.foreach(Stream.scala:254)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

我的/等/主机配置

127.0.0.1       ip-172-32-1-95 localhost.localdomain localhost
::1             localhost6.localdomain6 localhost6

我的server.properties档案

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#host.name=localhost

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
num.io.threads=2

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# The directory under which to store log files
log.dir=/tmp/kafka-logs

# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=1

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
#    3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000

# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false </code>

我的动物园管理员配置动物园.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

我试图删除 /tmp/zookeeper或 /tmp/kafka-logs下的所有Kafka和动物园管理员的信息,并重新启动所有内容,但仍然收到相同的错误。

共有1个答案

艾志尚
2023-03-14

酷!我猜你是在运行kafka-console-producer发布消息给一个话题“edwintest”。在运行生成器之前,使用以下命令创建主题

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic edwintest

然后启动您的控制台生成器。希望这能解决你的问题。

[EDIT]显然,您必须确保您的ec2安全组得到正确更新,以便为生产者打开zk和kafka broker端口。

 类似资料:
  • 从文件夹内部执行“npm start”命令时出现以下错误/ 我在互联网上尝试了多种解决方案,但都无效。

  • 我通过pgAdmin III与远程postgres-9.3连接。关闭pgAdmin后,我尝试在另一天与同一数据库连接。 在pgstartup.log文件中,我有: pg_hba.conf: 服务器上的操作系统是CentOS。不幸的是,我不知道postgres是如何安装在服务器上的,因为有人这样做了。我能用这个做什么?

  • 当前的java版本是

  • 启动Tomcat失败,请检查C:\Program Files\Apache Software Foundation\Apache Tomcat 8.0.27\bin\catalina。bat和相关脚本是可执行的。 一直有这个问题在netbean。如何解决问题?请帮忙。

  • 我下载了Scene Builder,试图打开一个.fxml文件,但出现了以下错误消息: 启动SceneBuilder失败。错误消息是:无法运行程序“C:\program Files(x86)\Oracle\JavaFX Scene Builder 2.0”:CreateProcess error=5,Acces被拒绝

  • 现象1 启动后报错类似如下: php start.php start PHP Warning: stream_socket_server(): unable to connect to tcp://xx.xx.xx.xx:xxxx (Address already in use) in ...workerman/Worker.php on line xxxx 关键字: Address alre