当前位置: 首页 > 知识库问答 >
问题:

Logstash-Kafka集成不工作

蔡楚
2023-03-14

我们希望使用logstash获取日志并将其传递给Kafka。

我们已经为logstash1.5.0beta1和kafka 2.9.2_0.8.1.1编写了以下conf文件

**

input {    
         file {    
         type => "apache"    
         path => ["/var/log/apache2/access.log", "/var/log/apache2/error.log"]    
     }
}




output {    
    kafka {    
        codec => plain {    
            format => "%{message}"    
        }
    topic_id => "example1"    
    }    
}

**

运行以下命令后:bin/logstash代理-ftest.conf--logex.log

test.conf是我们的conf文件。ex.log是我们为要存储的日志创建的空白文件。

我们得到以下输出

发送logstash日志到ex.log.使用里程碑2输入插件文件。这个插件应该是稳定的,但是如果你看到奇怪的行为,请让我们知道!有关插件里程碑的更多信息,请参见http://logstash.net/docs/1.5.0.beta1/plugin-milestones{: level=

我们尝试在bashrc中设置CLASSPATH。没有工作。请告诉我们哪里出了问题。提前感谢!

共有1个答案

章誉
2023-03-14

您可以检查以下事项:主题示例1是否可用?如果没有,您是否在Kafka中使用了自动创建?检查现有主题,如:

bin/kafka-topics.sh --list --zookeeper localhost:2181

示例1应该在返回的项目之间,如果不是,您也可以手动创建主题。

bin/kafktopics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic example1

要查看Kafka是否收到启动消费者的消息,请执行以下操作:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic example1

我启动了一个简单的logstash实例,它监听并使用消费者中的标准,我可以看到消息是否以kafka的形式到达。这是我的logstash配置:

input {
        stdin { }
}

output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
        topic_id => "example1"
    }
}

希望这能有所帮助

 类似资料:
  • 我读到elasticsearch Rivers/River插件不推荐使用。所以我们不能直接进行elasticsearch-kafka积分。如果我们想这样做,那么我们需要在两者之间有一个java(或任何语言)层,使用它的API将数据从kafka放到弹性搜索。 另一方面,如果我们有kafka-logstash-elasticsearch,那么我们可以去掉上面的中间层,并通过logstash来实现,只需

  • 我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.

  • 我配置了logstash转发器并尝试解析日志文件。希望我通过查阅日志正确地完成了所有步骤。在运行logstash配置文件和logstash转发器时,我没有看到任何错误,但是文件解析没有启动。 以下是我的配置详细信息 日志存储的输出 sudo-bin/logstash-f/etc/logstash/central。使用里程碑1输入插件“lumberjack”进行配置。这个插件应该可以工作,但会从像你

  • 我试图使用Spring/AspectJ集成,但运气不好。Spring版本是3.2.17(是的,我知道有点旧)。 以下是我的相关配置: pom.xml: 应用程序上下文。xml: spect.java(相关类): 我倒了很多在线教程,运气不好。有人能指出我做错了什么吗? 杰森

  • 一、整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的

  • 提供核心的 Storm 和Trident 的spout实现,用来从Apache Kafka 0.8x版本消费数据. ##Spouts 我们支持 Trident 和 core Storm 的spout.对于这两种spout实现,我们使用BorkerHosts接口来跟踪Kafka broker host partition 映射关系,用KafkaConfig来控制Kafka 相关参数. ###Brok