当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect FileStreamSource 忽略附加的行

赵雅懿
2023-03-14

我正在开发一个使用Spark处理日志的应用程序,我想使用Kafka作为从日志文件中流式传输数据的一种方式。基本上,我有一个日志文件(在本地文件系统上),它会随着新日志不断更新,而Kafka Connect似乎是从文件中获取数据以及新附加行的完美解决方案。

我使用以下命令以默认配置启动服务器:

Zookeeper服务器:

< code > zookeeper-server-start . sh config/zookeeper . properties

动物园管理员属性

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

Kafka服务器:

kafka-server-start.sh 配置/服务器属性

server.properties

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]

然后我创建了主题“连接测试”:

< code > Kafka-topics . sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic connect-test

最后我运行了Kafka连接器:

connect-standalone.sh配置/connect-standalone.properties配置/connect-file-source.properties

connect-standalone.properties(连接标准属性)

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

连接-文件-源.属性

name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test

首先,我通过运行一个简单的控制台消费者来测试连接器:

< code > Kafka-console-consumer . sh-bootstrap-server localhost:9092-topic connect-test-from-beginning

一切都很正常,消费者从文件中接收日志,当我添加日志时,消费者不断更新新的日志。

(然后,我按照本指南以“消费者”的身份尝试Spark:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2个直接接近的无接收器,它仍然很好)

在此之后,我从日志文件中删除了一些日志并更改了主题(我删除了“连接测试”主题,创建了另一个主题并使用新主题编辑了connect-file-source.properties)。

但是现在连接器不再以相同的方式工作。使用控制台使用者时,我只获取文件中已有的日志,并且忽略我添加的每个新行。也许在不更改连接器名称的情况下更改主题(和/或修改日志文件中的数据)会破坏 Kafka 中的某些内容......

这就是Kafka Connect对我的主题“新主题”和连接器“新文件连接器”所做的事情:

[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

(即使在文件中附加新行后,它也会不断刷新 0 条未完成的消息)

所以我尝试重新开始:我删除了/tmp/kafka日志目录/tmp/connect。偏移文件,并使用了全新的主题名称、连接器名称和日志文件,以防万一。但是,连接器仍然忽略了新的日志……我甚至试图删除我的kafka,从存档中重新提取它,然后再次运行整个过程(以防kafka中发生了变化),但没有成功。

我不知道问题出在哪里,任何帮助都将不胜感激!

共有2个答案

丁志勇
2023-03-14

Kafka Connect 不会“监视”或“跟踪”文件。我不相信它在任何地方都有记录表明它确实这样做了。

我会说它对于读取活动日志比使用 Spark Streaming 观看文件夹更不有用。Spark将“识别”新创建的文件。Kafka Connect FileStreamSource 必须指向单个预先存在的不可变文件。

要让 Spark 处理活动日志,您需要执行“日志轮换”的东西 - 也就是说,当文件达到最大大小或时间段(例如一天)结束等条件时,此过程会将活动日志移动到 Spark 正在监视的目录,然后它处理启动一个新的日志文件,以便您的应用程序继续写入。

如果您希望文件被主动监视并摄入到Kafka中,那么可以使用FileBeat、Fluentd或Apache Flume。

聂溪叠
2023-03-14

根据文档:

FileStream连接器示例旨在向那些作为用户或开发人员首次使用Kafka Connect的用户展示一个简单的连接器是如何运行的。不建议在生产中使用。

我会使用类似Filebeat(带有Kafka输出)的东西来将日志摄取到Kafka中。或者kafka connect spoldir,如果您的日志没有直接附加到,而是放在文件夹中的独立文件,以便接收。

 类似资料:
  • 问题内容: 我想通过nohup.php启动服务器,但是命令未运行,并显示以下错误 nohup:忽略输入并将输出附加到“ nohup.out” 我正在通过腻子使用ssh,这就是我在做什么 nohup php server1.php 问题答案: 这不是错误-这是正常行为。它只是通知您,一旦启动,输入/输出将从控制台中删除。 为了避免出现该消息,您需要像这样启动它

  • 问题内容: 我正在开发一个使用Spring-boot,关系数据库和Elasticsearch的应用程序。 我在代码的2个不同位置使用JSON序列化: 在REST API的响应中。 当代码与Elasticsearch交互时。 我在Elasticsearch中需要一些属性,但我想向应用程序用户隐藏(例如,来自关系数据库的内部ID)。 这是一个实体的例子: 问题 :当对象持久化在Elasticsearc

  • 问题内容: 我正在某个网站上插入一些标题中的换行符。假设我无法编辑源HTML,是否有CSS可以忽略这些中断的方法? 我正在移动网站的优化,所以我真的不想使用JavaScript。 问题答案: 使用css,您可以“隐藏” br标签,它们不会起作用: 如果只想在特定的标题类型中隐藏某些内容,只需使CSS更具体即可。

  • 读取文件已支持 windows 系统,版本号大于等于 1.3.4.1; 扩展版本大于等于 1.2.7; PECL 安装时将会提示是否开启读取功能,请键入 yes; 测试数据准备 $config = ['path' => './tests']; $excel = new \Vtiful\Kernel\Excel($config); ​ // 写入测试数据 $filePath = $excel->f

  • 问题内容: 这是我的实体: 这是我获得人员名单的方式: 如果我正确理解提取图,则它必须仅加载我指定的那些字段。但是,字段“ birthDate”也已加载。此外,我看到在hibernateSQL查询中选择了4列。 如何解决?我使用hibernate 5.1.0作为JPA提供程序。 问题答案: 实体图旨在控制延迟或渴望加载哪些关系(例如,一对一,一对多等)。它们可能不适用于加载各个列(取决于提供程序)

  • 由于某些原因,我无法将CURL与HTTPS一起使用。在我升级curl库之前,一切都很正常。现在,我在尝试执行CURL请求时遇到了这种响应:SSL CA证书(路径?访问权限?)有问题 以下是关于相关问题的建议,我试图做到以下几点: > 启用并指向从http://curl.haxx.se/docs/caextract.html下载的cacert.pem 我也试着用positiveSSL做同样的事情。c