当前位置: 首页 > 知识库问答 >
问题:

无法使用kafka命令行将json tweets事件发送到Kafka主题/生产者

江智
2023-03-14

我创建了一个python脚本raw\u tweets\u流。py使用twitter api流式传输twitter数据。twitter上的json数据使用下面的脚本传递给kafka producer。

`python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:2181 --topic raw_json_tweets`

raw\u json\u推文是为这些推文创建的Kafka主题。python脚本raw_tweets_流。py运行正常,但在将其发送给Kafka制作人时抛出错误。我正在使用Hortonworks HDP 2.3.1沙盒,我已经确保zookeeper和kafka已经启动。

/usr/hdp/now/kafka-Broker/bin/kafka-topics.sh--zooKeerlocalhost:2181--描述--主题raw_json_tweets

Topic:raw_json_tweets      PartitionCount:1        ReplicationFactor:1     Configs:
            Topic: raw_json_tweets     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

错误:

[2016-08-25 22:36:26,212] ERROR Failed to send requests for topics raw_json_tweets with correlation ids in [57,64] (kafka.producer.async.DefaultEventHandler)
[2016-08-25 22:36:26,213] ERROR Error in handling batch of 131 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-08-25 22:36:27,217] WARN Fetching topic metadata with correlation id 65 for topics [Set(json_tweets1)] from broker [BrokerEndPoint(0,localhost,2181)] failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
        at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

更新:解决方案

>

  • 转到Ambari Services并将Kafka日志目录更改为/tmp/kafka-logs
  • 修改了原始脚本以包含正确的端口和主机名。

    python raw\u tweets\u流。py |/usr/hdp/current/kafka-broker/bin/kafka-console-producer。sh——代理列表沙箱。霍顿工厂。com:6667——主题raw\u json\u推特

    已验证使用控制台使用者将事件发送到kafka主题。

    /usr/hdp/2.3.0.0-2557/kafka/bin/kafka-console-consumer.sh-zooKeersandbox.hortonworks.com:2181-主题raw_json_tweets-从头开始

  • 共有1个答案

    宗政安歌
    2023-03-14

    看起来您将--broer-list指向zoomaster(2181),而您需要指向默认端口为9092或Ambari上的6667的kafka代理。

     类似资料:
    • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr

    • 我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?

    • 问题内容: 我正在尝试执行Java邮件,并且收到错误消息“无法将命令发送到SMTP主机”。任何帮助,将不胜感激。以及任何将来可能出现的问题的解决方案。确切的例外是 我的代码如下 问题答案: 服务器需要STARTTLS。如果我使用telnet进行手动SMTP会话,则会得到以下信息: 该服务器不接受未加密的连接

    • 我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如

    • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那

    • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会