我有一个将数据摄取到elasticsearch中的水槽代理。该代理正在使用spolDir
源代码。还有另一个代理将文件写入elasticsearch代理的spolDir。
随着时间的推移,文件会增加,已处理文件和未处理文件之间的差异也会增加。
我想增加水槽代理处理的事件数量,以加速摄取过程。
这是水槽代理的配置。
代理04.sources=s1
agent04.channels=ch1
agent04.channels = memoryChannel
代理04.通道.内存通道.类型 = 内存
agent04.channels.memoryChannel.capacity=100000
agent 04 . channels . memory channel . transaction capacity = 1000
agent04.sources.s1.channels
agent04.sources.s1.type
agent04.sources.s1.spool /DataCollection/Flume_Cleaner_Output/Json_Elastic
代理04.sources.s1.反序列化程序.maxLineLength = 100000
agent04.sinks=elasticsearch
agent04.sinks.elasticsearch.channel
agent04.sinks.elasticsearch.type=org.css.cssElasticsearchSink
agent04.sinks.elasticsearch.batch尺寸=400
agent04.sinks.elasticsearch.host名称=elastic-node01.css.org
agent04.sinks.elasticsearch.index名称=all_collections
agent04.sinks.elasticsearch.indexType = live_tweets
agent04.sinks.elasticsearch.indexNameBuilder= org.css.sa.flume.elasticsearch.sink.indexNameBuilder.HeaderValueBasedIndexNameBuilder
agent 04 . sinks . elastic search . cluster name = CSS _ rai _ social
agent04.sinks.elasticsearch.serializer=org.jai.flume.sinks.elasticsearch.serializer.ElasticSearchJsonBodyEventSeriezer
agent04.sinks.elasticsearch.cache_period_ms=90d
为什么要使用 spooldir 链接两个 Flume 代理?这将非常慢,并且是一个令人惊讶的配置。在处理每个批次时,您会产生频繁 fsync 的成本。
我建议您使用Avro Sink和Avro Source将它们链接起来。我还会将批量大小增加到至少 1000。(计算机真的很喜欢批处理,Flume被设置为这样做)。
我有一个要求,我想运行以假脱机目录作为源的 Flume 代理。将假脱机目录中的所有文件复制到 HDFS(sink) 后,我希望代理停止,因为我知道所有文件都被推送到通道。我还想每次为不同的假脱机目录运行此步骤,并在目录中的所有文件都标记为 .完成。有没有办法停止水槽剂?
下午好,我在增加Flume的堆大小时遇到了麻烦。结果,我得到: 我增加了“flume-env.sh”和Hadoop/Yarn中定义的堆。运气不好。 有一点要注意,在启动水槽时,Exec(进程构建器?)似乎将堆定义为20Mb。关于如何覆盖它有什么想法吗? 最终,我尝试将Heapsize设置为1512MB。
https://cwiki.apache.org/confluence/display/FLUME/Getting 开始的页面说 HDFS sink 支持追加,但我无法找到有关如何启用它的任何信息,每个示例都在滚动文件上。因此,如果可能的话,我将不胜感激有关如何将水槽附加到现有文件的任何信息) 使现代化 可以将所有滚动属性设置为0,这将使flume写入单个文件,但它不会关闭文件,新记录对其他进程不
我正在构建一个 Spring 启动独立应用程序,该应用程序需要使用来自远程服务器的消息并将其写入 。我正在使用Flume嵌入式代理来可靠地记录消息。但是我收到以下错误。 引起:org.apache.flume.Flume异常:组件类型的com.security.flume.sink.Sy 根据文档仅支持。这是否意味着我们甚至不能编写自定义水槽? 我对Flume相当陌生。我真的很感谢你在这个问题上的
我正在尝试将日志从单台机器上的不同目录收集到本地文件系统文件或 HDFS。 我已经注册了 2 个来源 r1、r2。两个源都指向单通道C1。有一个接收器连接到通道。K1 请找到下面的配置文件: 但是当我使用代理 a1 启动 Flume 时,只有一个源 (r2) 正在启动。水槽代理启动日志: 谢谢
我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法