当前位置: 首页 > 知识库问答 >
问题:

Logstash不能从Kafka消费

韶和璧
2023-03-14

我的结构是这样的:日志文件

但我卡在Kafka到Logstash部分。

首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它:

kafka-console-consumer.bat --zookeeper localhost:2181 --topic {topicName} --from-beginning

也可以由命令使用:

kafka-console-consumer.bat --zookeeper localhost:2181 --topic {topicName}

但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出:

2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215]-接受来自/127.0.0.1:53201的插座连接

2017-11-13 16:11:59207[myid:]-警告[NIOServerCxn.工厂:0.0.0.0/0.0.0.0:21 81:NIOServerCnxn@383]-导致会话0x0关闭的异常:null

2017-11-13 16:11:59209[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040]-客户端的闭合套接字连接/127.0.0.1:53201(未为客户端建立会话)

此外,此语句中的端口号不断增加“从/127.0.0.1:53201接受套接字连接”。它正在扫描端口吗?

以下是一些背景。版本:

动物园管理员-3.4.11

Kafka大学2.10-0.10.2.1

logstash-5.4.0

(插件)logstash输入kafka(5.1.6)

这是我的配置:

Zoo.cfg(动物园管理员)

tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181

服务器财产(Kafka)

listeners=PLAINTEXT://localhost:9092
zookeeper.connect=127.0.0.1:2181

日志存储。配置(日志存储)

input {
    kafka {
        bootstrap_servers => "localhost:2181"
        topics => "test"
    }   
}

我想问题是由于动物园管理员和Kafka之间的配置问题,但我不知道如何解决。请帮忙。

共有1个答案

邹驰
2023-03-14

您需要使用localhost:9092作为配置参数bootstrap_servers-您将Logstash指向Zoomaster,但您需要指向Kafka本身。

另一个错误是主题需要一个数组,所以它应该是数组而不是测试

 类似资料:
  • 我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需

  • 我们希望使用logstash获取日志并将其传递给Kafka。 我们已经为logstash1.5.0beta1和kafka 2.9.2_0.8.1.1编写了以下conf文件 ** ** 运行以下命令后:bin/logstash代理-ftest.conf--logex.log test.conf是我们的conf文件。ex.log是我们为要存储的日志创建的空白文件。 我们得到以下输出 发送logstas

  • Kafka在docker容器里工作得很好。我可以使用并成功地创建主题、生成/使用消息,但是当我使用本地kafka脚本从docker容器外部尝试时,我只能创建和列出主题。生成和使用消息会引发错误: 生产: 消耗: 这是我的DockerFile: 脚本/start-kafka.sh

  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理