当前位置: 首页 > 知识库问答 >
问题:

洛格萨什和Kafka

焦同
2023-03-14

我的服务器机器上运行单节点kafka。我使用以下命令创建主题“bin/kafka-topics.sh--创建--zookeeper本地主机:2181--复制因子1--分区1--主题测试”。我有两个logstash实例正在运行。第一个从一些java应用程序日志文件中读取数据,并将其注入kafka。它工作得很好,我可以使用“bin/kafka-console-consumer.sh——zookeeper localhost:2181——主题测试——从头开始”命令在控制台上查看kafka中的数据。但是另一个从kafka(同一主题“test”)读取并注入elasicsearch的logstash实例失败了。第二个logstash实例无法读取数据。我将其配置文件更改为从kafka读取并在控制台上打印,然后它也不会输出任何内容。以下是失败日志存储的配置文件:

// config file
     input {
        kafka {
        zk_connect => "localhost:2181"
        topic_id => "test" 
        }
        }
        output {
        stdout{}
        }

Logstash既不打印任何内容,也不抛出任何错误。我使用的是Logstash 2.4和kafka 0.10。我使用了《Kafka快速入门指南》(http://kafka.apache.org/documentation.html#quickstart)

共有3个答案

任长卿
2023-03-14

请查看Kafka输入配置选项https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

请找到Kafka数据的Logstash配置,并将其推送到ELK Stack。

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["topic_name"]
  }
}

output{
    elasticsearch{
        hosts => ["http://localhost:9200/"]
        index => "index_name"
    }
}

希望有帮助!

丁曦
2023-03-14
@wjp 
Hi wjp, I am running single node kafka cluster. There is no Schema Registry running. zookeeper is also running.   I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test".  I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash:
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "test" 
}
}
output {
stdout{}
}
Logstash neither print anything nor it throws any error.
I am using Logstash 2.4 and kafka 0.10.
I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)
江曦
2023-03-14

如果您查看Kafka输入插件配置,您可以看到一个重要参数,该参数允许连接到Kafka集群:zk\u connect。

根据文档,它默认设置为localhost:2181。确保将其设置为Kafka群集实例,或者理想情况下设置为多个实例,具体取决于您的设置。

例如,假设您使用JSON主题连接到一个三节点Kafka集群。配置如下:

kafka {
 topic_id => "your_topic"
 zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
}

此外,为主题配置正确的编解码器也很重要。上面的html" target="_blank">示例将适用于JSON事件。如果您使用Avro,您需要设置另一个参数——编解码器。有关如何配置它的详细信息已在留档页面上详细记录。它基本上需要指向Avro模式文件,可以作为avsc文件或模式注册表endpoint提供(在我看来更好的解决方案)。

如果您有在Kafka环境中运行的模式注册表,您可以将编解码器指向它的url。一个完整的例子是:

kafka {
 codec => avro_schema_registry { endpoint => "http://kc1.host:8081"}
 topic_id => "your_topic"
 zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
}

希望它能起作用!

 类似资料:
  • 我在兜圈子,同时确定一个最好的“轻量级”路由,同时具有saml和Oauth服务器功能 要求 有一个沉重的saml idp完美运行和集成到应用程序(shib) 用户需要使用一种独特的身份验证组合(是的,基于网络/浏览器,至少在一段时间内,我们预计不会改变这种浏览器要求(嵌入式或其他) 以下哪项是一个好的权衡? > 运行独立的(但在我们的控制下)Oauth服务器——使用shib idp进行身份验证(s

  • 问题内容: 我的联系人在解析RSS和Atom文件时遇到SAX问题。根据他的说法,好像来自Item元素的文本被截断为撇号或有时是带重音的字符。编码似乎也有问题。 我尝试了SAX,但也进行了一些删节操作,但无法进一步挖掘。如果有人曾经解决过这个问题,我将不胜感激。 这是ContentHandler中使用的代码: 编辑:编码问题可能是由于将信息存储在字节数组中,因为我知道Java在Unicode中工作。

  • 问题内容: 我正在制作一个读取XML Internet的Android应用程序。此应用程序使用SAX解析XML。这是我的解析部分代码: 问题是发生SAXException。异常消息如下: org.apache.harmony.xml.ExpatParser $ ParseException:在第4行,第42列:格式不正确(无效的令牌) 但是,如果我将相同的代码放在普通的Java SE应用程序中,则

  • 我创建了一个基于以下内容的本体论: > 每个人都是美丽的,如果他/她的父母之一是美丽的 阿佛洛狄忒是厄洛斯的父母 因此,我们希望爱神也是美丽的!然而,弹丸推理者似乎并不是这样推断的。如果我手动将爱神的类型置为successful的话,它会接受它,但它难道不应该推断它吗? 我的本体论就在这里(将扩展改为.owl)。我还提供了来自Protege的截图: 我错过了什么? 编辑: 我可以看到Eros出现在

  • 所以,我有这段代码: 我在Windows上运行。有趣的是: > 如果我们使用calloc,程序会分配大约2 GB的内存,然后停止(此处照片= (奇怪的测试):如果我们同时使用这两者,它将使用最大值(~400MB~2GB)/2= 然而,如果我在Linux上运行相同的代码,分配会一直持续下去(在600k分配和使用了许多GB之后,它仍然会继续,直到最终被杀死),并且使用的内存量大致相同。 所以我的问题是

  • 基于openresty的web防火墙,通过配合后台保护您的数据安全 项目说明 由于普通的web防火墙通常只是单台的限制, 并不能对集群中的流量进行全局的分析 从而无法达到有效的防止cc的攻击, 攻击者可分散攻击而让单台无法分析出其是否是恶意的攻击 所以需要有中台的分析,才能有效的判断是否为恶意IP,从而可以自动的识别出哪些用户是非法IP, 从而实行自动封禁, 基本上能保证90%以上的CC攻击自动拦