我的结构是这样的:日志文件
但我卡在Kafka到Logstash部分。
首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic {topicName} --from-beginning
也可以由命令使用:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic {topicName}
但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出:
2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215]-接受来自/127.0.0.1:53201的插座连接
2017-11-13 16:11:59207[myid:]-警告[NIOServerCxn.工厂:0.0.0.0/0.0.0.0:21 81:NIOServerCnxn@383]-导致会话0x0关闭的异常:null
2017-11-13 16:11:59209[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040]-客户端的闭合套接字连接/127.0.0.1:53201(未为客户端建立会话)
此外,此语句中的端口号不断增加“从/127.0.0.1:53201接受套接字连接”。它正在扫描端口吗?
以下是一些背景。版本:
动物园管理员-3.4.11
Kafka大学2.10-0.10.2.1
logstash-5.4.0
(插件)logstash输入kafka(5.1.6)
这是我的配置:
Zoo.cfg(动物园管理员)
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181
服务器财产(Kafka)
listeners=PLAINTEXT://localhost:9092
zookeeper.connect=127.0.0.1:2181
日志存储。配置(日志存储)
input {
kafka {
bootstrap_servers => "localhost:2181"
topics => "test"
}
}
我想问题是由于动物园管理员和Kafka之间的配置问题,但我不知道如何解决。请帮忙。
您需要使用localhost:9092
作为配置参数bootstrap_servers
-您将Logstash指向Zoomaster,但您需要指向Kafka本身。
另一个错误是主题需要一个数组,所以它应该是数组而不是测试
我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需
我们希望使用logstash获取日志并将其传递给Kafka。 我们已经为logstash1.5.0beta1和kafka 2.9.2_0.8.1.1编写了以下conf文件 ** ** 运行以下命令后:bin/logstash代理-ftest.conf--logex.log test.conf是我们的conf文件。ex.log是我们为要存储的日志创建的空白文件。 我们得到以下输出 发送logstas
Kafka在docker容器里工作得很好。我可以使用并成功地创建主题、生成/使用消息,但是当我使用本地kafka脚本从docker容器外部尝试时,我只能创建和列出主题。生成和使用消息会引发错误: 生产: 消耗: 这是我的DockerFile: 脚本/start-kafka.sh
我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?
虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?
我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理