下面给出的kafka producer程序不是通过Eclipse在Windows中运行的,而是在Unix平台上运行的(即,当我在承载kafka代理的Unix中运行它时,它工作正常)。windows不支持Kafka制作人吗?但是,我可以从windows计算机ping ip地址。请帮忙。
package kpkg;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KProducer {
/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("metadata.broker.list", "10.xx.xx.xx:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> km = new KeyedMessage<String, String>("SriKafkaTopic" , "SriFirstMessage");
producer.send(km);
producer.close();
}
}
这是我得到的异常错误。
log4j:WARN找不到记录器的附加程序(kafka.utils.VerifiableProperties)。log4j:警告请正确初始化log4j系统。SLF4J:未能加载类“org.SLF4J.impl.StaticLoggerBinder”。SLF4J:默认为无操作(NOP)记录器实现SLF4J:请参阅http://www.slf4j.org/codes.html#StaticLoggerBinder详情请参阅。线程“main”kafka中出现异常。常见的FailedToSendMessageException:尝试3次后发送消息失败。在Kafka。制作人异步的。DefaultEventHandler。kafka上的句柄(DefaultEventHandler.scala:90)。制作人制作人在Kafka发送(制作人:scala:76)。javaapi。制作人制作人以kkg的速度发送(生产商scala:33)。KProducer。main(KProducer.java:30)
以下是服务器的内容。日志
[2014-06-06 16:24:07,342] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,408] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,409] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,409] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,409] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,409] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,409] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,410] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,410] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,410] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,410] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,410] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,410] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,411] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,411] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,411] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2014-06-06 16:24:07,432] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2014-06-06 16:24:07,434] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2014-06-06 16:24:07,788] INFO Found clean shutdown file. Skipping recovery for all logs in data directory '/tmp/kafka-logs' (kafka.log.LogManager)
[2014-06-06 16:24:07,789] INFO Loading log 'SriKafkaTopic-0' (kafka.log.LogManager)
[2014-06-06 16:24:07,820] INFO Completed load of log SriKafkaTopic-0 with log end offset 11 (kafka.log.Log)
[2014-06-06 16:24:07,854] INFO Loading log 'test-0' (kafka.log.LogManager)
[2014-06-06 16:24:07,855] INFO Completed load of log test-0 with log end offset 2 (kafka.log.Log)
[2014-06-06 16:24:07,855] INFO Loading log 'test1-0' (kafka.log.LogManager)
[2014-06-06 16:24:07,856] INFO Completed load of log test1-0 with log end offset 0 (kafka.log.Log)
[2014-06-06 16:24:07,856] INFO Loading log 'test2-0' (kafka.log.LogManager)
[2014-06-06 16:24:07,857] INFO Completed load of log test2-0 with log end offset 1 (kafka.log.Log)
[2014-06-06 16:24:07,858] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2014-06-06 16:24:07,861] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-06-06 16:24:07,887] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-06-06 16:24:07,897] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2014-06-06 16:24:07,974] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-06-06 16:24:08,004] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-06-06 16:24:08,307] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-06-06 16:24:08,339] INFO Registered broker 0 at path /brokers/ids/0 with address master:9092. (kafka.utils.ZkUtils$)
[2014-06-06 16:24:08,346] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-06-06 16:24:08,666] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0],[SriKafkaTopic,0],[test2,0],[test1,0] (kafka.server.ReplicaFetcherManager)
[2014-06-06 16:24:08,739] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0],[SriKafkaTopic,0],[test2,0],[test1,0] (kafka.server.ReplicaFetcherManager)
[2014-06-06 16:32:06,387] INFO Closing socket connection to /10.87.145.30. (kafka.network.Processor)
[2014-06-06 16:32:06,894] INFO Closing socket connection to /10.87.145.30. (kafka.network.Processor)
[2014-06-06 16:32:06,993] INFO Closing socket connection to /10.87.145.30. (kafka.network.Processor)
[2014-06-06 16:32:07,093] INFO Closing socket connection to /10.87.145.30. (kafka.network.Processor)
[2014-06-06 16:32:07,193] INFO Closing socket connection to /10.87.145.30. (kafka.network.Processor)
这是本书的内容
2014-06-06T16:24:07.689+0530: 1.216: [GC 1.216: [ParNew: 26240K->738K(29504K), 0.0131100 secs] 26240K->738K(1045312K), 0.0131890 secs] [Times: user=0.02 sys=0.00, real=0.02 secs]
2014-06-06T16:24:08.475+0530: 2.002: [GC 2.002: [ParNew: 26978K->1175K(29504K), 0.0140800 secs] 26978K->1175K(1045312K), 0.0141450 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]
2014-06-06T16:24:08.494+0530: 2.021: [GC [1 CMS-initial-mark: 0K(1015808K)] 1353K(1045312K), 0.0028850 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]
2014-06-06T16:24:08.506+0530: 2.034: [CMS-concurrent-mark-start]
2014-06-06T16:24:08.657+0530: 2.185: [CMS-concurrent-mark: 0.148/0.151 secs] [Times: user=0.15 sys=0.00, real=0.15 secs]
2014-06-06T16:24:08.657+0530: 2.185: [CMS-concurrent-preclean-start]
2014-06-06T16:24:08.668+0530: 2.195: [CMS-concurrent-preclean: 0.011/0.011 secs] [Times: user=0.01 sys=0.00, real=0.01 secs]
2014-06-06T16:24:08.668+0530: 2.195: [CMS-concurrent-abortable-preclean-start]
CMS: abort preclean due to time 2014-06-06T16:24:13.672+0530: 7.200: [CMS-concurrent-abortable-preclean: 0.922/5.004 secs] [Times: user=0.91 sys=0.02, real=5.01 secs]
2014-06-06T16:24:13.672+0530: 7.200: [GC[YG occupancy: 8887 K (29504 K)]2014-06-06T16:24:13.672+0530: 7.200: [GC 7.200: [ParNew: 8887K->1559K(29504K), 0.0098440 secs] 8887K->1559K(1045312K), 0.0099560 secs] [Times: user=0.01 sys=0.00, real=0.01 secs]
7.210: [Rescan (parallel) , 0.0109290 secs]7.221: [weak refs processing, 0.0000060 secs]7.221: [class unloading, 0.0014740 secs]7.222: [scrub symbol & string
这是状态变化的内容。日志
[2014-06-06 16:24:08,198] TRACE Controller 0 epoch 8 started leader election for partition [test,0] (state.change.logger)
[2014-06-06 16:24:08,216] ERROR Controller 0 epoch 8 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController.startup(KafkaController.scala:626)
at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
at kafka.Kafka$.main(Kafka.scala:46)
at kafka.Kafka.main(Kafka.scala)
[2014-06-06 16:24:08,217] TRACE Controller 0 epoch 8 started leader election for partition [test2,0] (state.change.logger)
[2014-06-06 16:24:08,224] ERROR Controller 0 epoch 8 initiated state change for partition [test2,0] from OfflinePartition to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test2,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController.startup(KafkaController.scala:626)
at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
at kafka.Kafka$.main(Kafka.scala:46)
at kafka.Kafka.main(Kafka.scala)
[2014-06-06 16:24:08,225] TRACE Controller 0 epoch 8 started leader election for partition [SriKafkaTopic,0] (state.change.logger)
[2014-06-06 16:24:08,235] ERROR Controller 0 epoch 8 initiated state change for partition [SriKafkaTopic,0] from OfflinePartition to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [SriKafkaTopic,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController.startup(KafkaController.scala:626)
at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
at kafka.Kafka$.main(Kafka.scala:46)
at kafka.Kafka.main(Kafka.scala)
[2014-06-06 16:24:08,243] TRACE Controller 0 epoch 8 started leader election for partition [test1,0] (state.change.logger)
[2014-06-06 16:24:08,268] ERROR Controller 0 epoch 8 initiated state change for partition [test1,0] from OfflinePartition to OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test1,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:335)
at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:184)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:98)
at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:95)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:95)
at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:311)
at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:161)
at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:630)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:626)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController.startup(KafkaController.scala:626)
at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
at kafka.Kafka$.main(Kafka.scala:46)
at kafka.Kafka.main(Kafka.scala)
[2014-06-06 16:24:08,433] TRACE Controller 0 epoch 8 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:6,ControllerEpoch:7) with correlationId 3 to broker 0 for partition [test,0] (state.change.logger)
[2014-06-06 16:24:08,462] TRACE Controller 0 epoch 8 changed state of replica 0 for partition [test,0] from ReplicaDeletionIneligible to OnlineReplica (state.change.logger)
[2014-06-06 16:24:08,463] TRACE Controller 0 epoch 8 changed state of replica 0 for partition [test2,0] from ReplicaDeletionIneligible to OnlineReplica (state.change.logger)
[2014-06-06 16:24:08,463] TRACE Controller 0 epoch 8 changed state of replica 0 for partition [SriKafkaTopic,0] from ReplicaDeletionIneligible to OnlineReplica (state.change.logger)
[2014-06-06 16:24:08,464] TRACE Controller 0 epoch 8 changed state of replica 0 for partition [test1,0] from ReplicaDeletionIneligible to OnlineReplica (state.change.logger)
[2014-06-06 16:24:08,472] TRACE Controller 0 epoch 8 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:6,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [test,0] (state.change.logger)
[2014-06-06 16:24:08,472] TRACE Controller 0 epoch 8 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:4,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [test2,0] (state.change.logger)
[2014-06-06 16:24:08,472] TRACE Controller 0 epoch 8 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:3,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [SriKafkaTopic,0] (state.change.logger)
[2014-06-06 16:24:08,473] TRACE Controller 0 epoch 8 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:5,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [test1,0] (state.change.logger)
[2014-06-06 16:24:08,473] TRACE Controller 0 epoch 8 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:6,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [test,0] (state.change.logger)
[2014-06-06 16:24:08,473] TRACE Controller 0 epoch 8 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:4,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [test2,0] (state.change.logger)
[2014-06-06 16:24:08,474] TRACE Controller 0 epoch 8 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:3,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [SriKafkaTopic,0] (state.change.logger)
[2014-06-06 16:24:08,491] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:6,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controller 0 epoch 8 with correlation id 3 (state.change.logger)
[2014-06-06 16:24:08,491] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:4,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0) for partition [test2,0] in response to UpdateMetadata request sent by controller 0 epoch 8 with correlation id 3 (state.change.logger)
[2014-06-06 16:24:08,492] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:3,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0) for partition [SriKafkaTopic,0] in response to UpdateMetadata request sent by controller 0 epoch 8 with correlation id 3 (state.change.logger)
[2014-06-06 16:24:08,492] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:5,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0) for partition [test1,0] in response to UpdateMetadata request sent by controller 0 epoch 8 with correlation id 3 (state.change.logger)
[2014-06-06 16:24:08,497] TRACE Controller 0 epoch 8 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:5,ControllerEpoch:7) with correlationId 4 to broker 0 for partition [test1,0] (state.change.logger)
[2014-06-06 16:24:08,498] TRACE Controller 0 epoch 8 started leader election for partition [test,0] (state.change.logger)
[2014-06-06 16:24:08,568] TRACE Controller 0 epoch 8 elected leader 0 for Offline partition [test,0] (state.change.logger)
[2014-06-06 16:24:08,570] TRACE Controller 0 epoch 8 changed partition [test,0] from OfflinePartition to OnlinePartition with leader 0 (state.change.logger)
[2014-06-06 16:24:08,570] TRACE Controller 0 epoch 8 started leader election for partition [test2,0] (state.change.logger)
我正面临着完全相同的问题,我解决了它通过设置指定以下两个属性在server.properties的经纪人:
hostname
advertised.host.name
advertised.port
P. S.请点击以下链接了解更多详情:
https://kafka.apache.org/documentation.html#brokerconfigs
有几个检查,你应该执行:
>
确保您的Unix计算机上没有运行防火墙:
服务iptable停止
确保您的Linux计算机具有可从Windows计算机解析的主机名。在将代理主机名(hostname实用程序返回的确切名称)放入客户机的/etc/hosts之前,我们也遇到了类似的问题(尽管对于消费者)。
我很难理解窗口在Kafka Streams中是如何工作的。到目前为止,结果似乎与我所阅读和理解的不一致。 我已经创建了一个带有支持主题的KSQL流。KSQL SELECT语句中的“列”之一已被指定为该主题的TIMESTAMP。 my-stream主题中的记录按键(PARTITION_KEY)分组,并用跳转窗口窗口 记录通过 然后我通过 组中的第一个窗口转换为7:00-7:05 当我通过控制台消费者
我目前正在试验java awt机器人,现在我想尝试在我的游戏窗口中按下一些东西。 为了蒸汽。exe时,我设置了以下属性:“与Windows 7的兼容性”、“始终以管理员身份运行”。 然后,我用这个启动了游戏“反击:全球进攻”。bat文件: 游戏启动了,现在我想用下面的代码点击游戏中的一些东西: 鼠标会转到正确的位置,但当它应该点击某个东西时,游戏不会做出反应或识别它。 然而,如果我不把蒸汽。exe
我对kafka制作人有问题。实际上我正在使用Spring kafka,并通过KafkaTemboard leke发送消息: 问题是有时发送消息需要 4-20 秒。有很多消息需要 100 毫秒才能发送。所以我有几个问题: > < li> 消息大小和吞吐量之间是否有关联,这种关系是什么? 我应该首先检查什么,也许我没有很好地调整,任何方向?
我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点
我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K
我是新手。 这是我的设置:/apache2.2php5.3。6窗口7/ 我在Apache/htdocs/test/index中有以下代码。php 我使用PoEdit在locale/de_de/LC_MESSAGES/MESSAGES下生成必要的翻译。po 当我访问,结果是Hello World!当它应该是霍尔贴边! 作为测试,我打开命令提示符并导航到测试文件夹。然后我打了进去 控制台中显示的结果是